summaryrefslogtreecommitdiff
path: root/modules/http2
diff options
context:
space:
mode:
authorStefan Eissing <icing@apache.org>2022-06-17 09:24:57 +0000
committerStefan Eissing <icing@apache.org>2022-06-17 09:24:57 +0000
commitff2ed5d73957148453b0b5bfd9ec4c311101473f (patch)
tree31803642a264b4edc33de579a4fe9eb843c60b1e /modules/http2
parent83c32adbe54074f113246e182acf7415b120b698 (diff)
downloadhttpd-ff2ed5d73957148453b0b5bfd9ec4c311101473f.tar.gz
*) mod_http2: new implementation of h2 worker pool.
- O(1) cost at registration of connection processing producers - no limit on registered producers - join of ongoing work on unregister - callbacks to unlink dependencies into other h2 code - memory cleanup on workers deactivation (on idle timeouts) - idle_limit as apr_time_t instead of seconds git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1902005 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'modules/http2')
-rw-r--r--modules/http2/h2_c1.c13
-rw-r--r--modules/http2/h2_config.c64
-rw-r--r--modules/http2/h2_config.h5
-rw-r--r--modules/http2/h2_mplx.c63
-rw-r--r--modules/http2/h2_mplx.h10
-rw-r--r--modules/http2/h2_workers.c694
-rw-r--r--modules/http2/h2_workers.h111
-rw-r--r--modules/http2/mod_http2.c4
8 files changed, 539 insertions, 425 deletions
diff --git a/modules/http2/h2_c1.c b/modules/http2/h2_c1.c
index 1dc0de7c60..7662a0e4fe 100644
--- a/modules/http2/h2_c1.c
+++ b/modules/http2/h2_c1.c
@@ -56,11 +56,8 @@ apr_status_t h2_c1_child_init(apr_pool_t *pool, server_rec *s)
{
apr_status_t status = APR_SUCCESS;
int minw, maxw;
- int max_threads_per_child = 0;
- int idle_secs = 0;
+ apr_time_t idle_limit;
- ap_mpm_query(AP_MPMQ_MAX_THREADS, &max_threads_per_child);
-
status = ap_mpm_query(AP_MPMQ_IS_ASYNC, &async_mpm);
if (status != APR_SUCCESS) {
/* some MPMs do not implemnent this */
@@ -70,12 +67,8 @@ apr_status_t h2_c1_child_init(apr_pool_t *pool, server_rec *s)
h2_config_init(pool);
- h2_get_num_workers(s, &minw, &maxw);
- idle_secs = h2_config_sgeti(s, H2_CONF_MAX_WORKER_IDLE_SECS);
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
- "h2_workers: min=%d max=%d, mthrpchild=%d, idle_secs=%d",
- minw, maxw, max_threads_per_child, idle_secs);
- workers = h2_workers_create(s, pool, minw, maxw, idle_secs);
+ h2_get_workers_config(s, &minw, &maxw, &idle_limit);
+ workers = h2_workers_create(s, pool, maxw, minw, idle_limit);
h2_c_logio_add_bytes_in = APR_RETRIEVE_OPTIONAL_FN(ap_logio_add_bytes_in);
h2_c_logio_add_bytes_out = APR_RETRIEVE_OPTIONAL_FN(ap_logio_add_bytes_out);
diff --git a/modules/http2/h2_config.c b/modules/http2/h2_config.c
index 4df058d95d..da1cf79a07 100644
--- a/modules/http2/h2_config.c
+++ b/modules/http2/h2_config.c
@@ -57,7 +57,7 @@ typedef struct h2_config {
int h2_window_size; /* stream window size (http2) */
int min_workers; /* min # of worker threads/child */
int max_workers; /* max # of worker threads/child */
- int max_worker_idle_secs; /* max # of idle seconds for worker */
+ apr_interval_time_t idle_limit; /* max duration for idle workers */
int stream_max_mem_size; /* max # bytes held in memory/stream */
int h2_direct; /* if mod_h2 is active directly */
int modern_tls_only; /* Accept only modern TLS in HTTP/2 connections */
@@ -93,7 +93,7 @@ static h2_config defconf = {
H2_INITIAL_WINDOW_SIZE, /* window_size */
-1, /* min workers */
-1, /* max workers */
- 10 * 60, /* max workers idle secs */
+ apr_time_from_sec(10 * 60), /* workers idle limit */
32 * 1024, /* stream max mem size */
-1, /* h2 direct mode */
1, /* modern TLS only */
@@ -136,7 +136,7 @@ void *h2_config_create_svr(apr_pool_t *pool, server_rec *s)
conf->h2_window_size = DEF_VAL;
conf->min_workers = DEF_VAL;
conf->max_workers = DEF_VAL;
- conf->max_worker_idle_secs = DEF_VAL;
+ conf->idle_limit = DEF_VAL;
conf->stream_max_mem_size = DEF_VAL;
conf->h2_direct = DEF_VAL;
conf->modern_tls_only = DEF_VAL;
@@ -152,7 +152,7 @@ void *h2_config_create_svr(apr_pool_t *pool, server_rec *s)
conf->padding_bits = DEF_VAL;
conf->padding_always = DEF_VAL;
conf->output_buffered = DEF_VAL;
- conf->stream_timeout = DEF_VAL;
+ conf->stream_timeout = DEF_VAL;
return conf;
}
@@ -168,7 +168,7 @@ static void *h2_config_merge(apr_pool_t *pool, void *basev, void *addv)
n->h2_window_size = H2_CONFIG_GET(add, base, h2_window_size);
n->min_workers = H2_CONFIG_GET(add, base, min_workers);
n->max_workers = H2_CONFIG_GET(add, base, max_workers);
- n->max_worker_idle_secs = H2_CONFIG_GET(add, base, max_worker_idle_secs);
+ n->idle_limit = H2_CONFIG_GET(add, base, idle_limit);
n->stream_max_mem_size = H2_CONFIG_GET(add, base, stream_max_mem_size);
n->h2_direct = H2_CONFIG_GET(add, base, h2_direct);
n->modern_tls_only = H2_CONFIG_GET(add, base, modern_tls_only);
@@ -194,7 +194,7 @@ static void *h2_config_merge(apr_pool_t *pool, void *basev, void *addv)
n->early_hints = H2_CONFIG_GET(add, base, early_hints);
n->padding_bits = H2_CONFIG_GET(add, base, padding_bits);
n->padding_always = H2_CONFIG_GET(add, base, padding_always);
- n->stream_timeout = H2_CONFIG_GET(add, base, stream_timeout);
+ n->stream_timeout = H2_CONFIG_GET(add, base, stream_timeout);
return n;
}
@@ -248,8 +248,8 @@ static apr_int64_t h2_srv_config_geti64(const h2_config *conf, h2_config_var_t v
return H2_CONFIG_GET(conf, &defconf, min_workers);
case H2_CONF_MAX_WORKERS:
return H2_CONFIG_GET(conf, &defconf, max_workers);
- case H2_CONF_MAX_WORKER_IDLE_SECS:
- return H2_CONFIG_GET(conf, &defconf, max_worker_idle_secs);
+ case H2_CONF_MAX_WORKER_IDLE_LIMIT:
+ return H2_CONFIG_GET(conf, &defconf, idle_limit);
case H2_CONF_STREAM_MAX_MEM:
return H2_CONFIG_GET(conf, &defconf, stream_max_mem_size);
case H2_CONF_MODERN_TLS_ONLY:
@@ -298,9 +298,6 @@ static void h2_srv_config_seti(h2_config *conf, h2_config_var_t var, int val)
case H2_CONF_MAX_WORKERS:
H2_CONFIG_SET(conf, max_workers, val);
break;
- case H2_CONF_MAX_WORKER_IDLE_SECS:
- H2_CONFIG_SET(conf, max_worker_idle_secs, val);
- break;
case H2_CONF_STREAM_MAX_MEM:
H2_CONFIG_SET(conf, stream_max_mem_size, val);
break;
@@ -354,6 +351,9 @@ static void h2_srv_config_seti64(h2_config *conf, h2_config_var_t var, apr_int64
case H2_CONF_STREAM_TIMEOUT:
H2_CONFIG_SET(conf, stream_timeout, val);
break;
+ case H2_CONF_MAX_WORKER_IDLE_LIMIT:
+ H2_CONFIG_SET(conf, idle_limit, val);
+ break;
default:
h2_srv_config_seti(conf, var, (int)val);
break;
@@ -557,14 +557,15 @@ static const char *h2_conf_set_max_workers(cmd_parms *cmd,
return NULL;
}
-static const char *h2_conf_set_max_worker_idle_secs(cmd_parms *cmd,
- void *dirconf, const char *value)
+static const char *h2_conf_set_max_worker_idle_limit(cmd_parms *cmd,
+ void *dirconf, const char *value)
{
- int val = (int)apr_atoi64(value);
- if (val < 1) {
- return "value must be > 0";
+ apr_interval_time_t timeout;
+ apr_status_t rv = ap_timeout_parameter_parse(value, &timeout, "s");
+ if (rv != APR_SUCCESS) {
+ return "Invalid idle limit value";
}
- CONFIG_CMD_SET(cmd, dirconf, H2_CONF_MAX_WORKER_IDLE_SECS, val);
+ CONFIG_CMD_SET64(cmd, dirconf, H2_CONF_MAX_WORKER_IDLE_LIMIT, timeout);
return NULL;
}
@@ -868,27 +869,22 @@ static const char *h2_conf_set_stream_timeout(cmd_parms *cmd,
return NULL;
}
-void h2_get_num_workers(server_rec *s, int *minw, int *maxw)
+void h2_get_workers_config(server_rec *s, int *pminw, int *pmaxw,
+ apr_time_t *pidle_limit)
{
int threads_per_child = 0;
- *minw = h2_config_sgeti(s, H2_CONF_MIN_WORKERS);
- *maxw = h2_config_sgeti(s, H2_CONF_MAX_WORKERS);
- ap_mpm_query(AP_MPMQ_MAX_THREADS, &threads_per_child);
+ *pminw = h2_config_sgeti(s, H2_CONF_MIN_WORKERS);
+ *pmaxw = h2_config_sgeti(s, H2_CONF_MAX_WORKERS);
- if (*minw <= 0) {
- *minw = threads_per_child;
- }
- if (*maxw <= 0) {
- /* As a default, this seems to work quite well under mpm_event.
- * For people enabling http2 under mpm_prefork, start 4 threads unless
- * configured otherwise. People get unhappy if their http2 requests are
- * blocking each other. */
- *maxw = 3 * (*minw) / 2;
- if (*maxw < 4) {
- *maxw = 4;
- }
+ ap_mpm_query(AP_MPMQ_MAX_THREADS, &threads_per_child);
+ if (*pminw <= 0) {
+ *pminw = threads_per_child;
+ }
+ if (*pmaxw <= 0) {
+ *pmaxw = H2MAX(4, 3 * (*pminw) / 2);
}
+ *pidle_limit = h2_config_sgeti64(s, H2_CONF_MAX_WORKER_IDLE_LIMIT);
}
#define AP_END_CMD AP_INIT_TAKE1(NULL, NULL, NULL, RSRC_CONF, NULL)
@@ -902,7 +898,7 @@ const command_rec h2_cmds[] = {
RSRC_CONF, "minimum number of worker threads per child"),
AP_INIT_TAKE1("H2MaxWorkers", h2_conf_set_max_workers, NULL,
RSRC_CONF, "maximum number of worker threads per child"),
- AP_INIT_TAKE1("H2MaxWorkerIdleSeconds", h2_conf_set_max_worker_idle_secs, NULL,
+ AP_INIT_TAKE1("H2MaxWorkerIdleSeconds", h2_conf_set_max_worker_idle_limit, NULL,
RSRC_CONF, "maximum number of idle seconds before a worker shuts down"),
AP_INIT_TAKE1("H2StreamMaxMemSize", h2_conf_set_stream_max_mem_size, NULL,
RSRC_CONF, "maximum number of bytes buffered in memory for a stream"),
diff --git a/modules/http2/h2_config.h b/modules/http2/h2_config.h
index c150fe21d8..6d2e65f926 100644
--- a/modules/http2/h2_config.h
+++ b/modules/http2/h2_config.h
@@ -28,7 +28,7 @@ typedef enum {
H2_CONF_WIN_SIZE,
H2_CONF_MIN_WORKERS,
H2_CONF_MAX_WORKERS,
- H2_CONF_MAX_WORKER_IDLE_SECS,
+ H2_CONF_MAX_WORKER_IDLE_LIMIT,
H2_CONF_STREAM_MAX_MEM,
H2_CONF_DIRECT,
H2_CONF_MODERN_TLS_ONLY,
@@ -88,7 +88,8 @@ apr_int64_t h2_config_rgeti64(request_rec *r, h2_config_var_t var);
apr_array_header_t *h2_config_push_list(request_rec *r);
-void h2_get_num_workers(server_rec *s, int *minw, int *maxw);
+void h2_get_workers_config(server_rec *s, int *pminw, int *pmaxw,
+ apr_time_t *pidle_limit);
void h2_config_init(apr_pool_t *pool);
const struct h2_priority *h2_cconfig_get_priority(conn_rec *c, const char *content_type);
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c
index 205d19f020..bb6b324115 100644
--- a/modules/http2/h2_mplx.c
+++ b/modules/http2/h2_mplx.c
@@ -58,6 +58,9 @@ typedef struct {
apr_size_t count;
} stream_iter_ctx;
+static conn_rec *c2_prod_next(void *baton, int *phas_more);
+static void c2_prod_done(void *baton, conn_rec *c2);
+
static void s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn_ctx);
static void m_be_annoyed(h2_mplx *m);
@@ -303,7 +306,7 @@ h2_mplx *h2_mplx_c1_create(int child_num, apr_uint32_t id, h2_stream *stream0,
m->q = h2_iq_create(m->pool, m->max_streams);
m->workers = workers;
- m->processing_max = m->max_streams;
+ m->processing_max = H2MIN(h2_workers_get_max_workers(workers), m->max_streams);
m->processing_limit = 6; /* the original h1 max parallel connections */
m->last_mood_change = apr_time_now();
m->mood_update_interval = apr_time_from_msec(100);
@@ -332,6 +335,9 @@ h2_mplx *h2_mplx_c1_create(int child_num, apr_uint32_t id, h2_stream *stream0,
m->max_spare_transits = 3;
m->c2_transits = apr_array_make(m->pool, m->max_spare_transits, sizeof(h2_c2_transit*));
+ m->producer = h2_workers_register(workers, m->pool,
+ apr_psprintf(m->pool, "h2-%d", (int)m->id),
+ c2_prod_next, c2_prod_done, m);
return m;
failure:
@@ -440,8 +446,7 @@ void h2_mplx_c1_destroy(h2_mplx *m)
/* How to shut down a h2 connection:
* 0. abort and tell the workers that no more work will come from us */
m->aborted = 1;
- h2_workers_unregister(m->workers, m);
-
+
H2_MPLX_ENTER_ALWAYS(m);
/* While really terminating any c2 connections, treat the master
@@ -485,6 +490,10 @@ void h2_mplx_c1_destroy(h2_mplx *m)
}
}
+ H2_MPLX_LEAVE(m);
+ h2_workers_join(m->workers, m->producer);
+ H2_MPLX_ENTER_ALWAYS(m);
+
/* 4. With all workers done, all streams should be in spurge */
ap_assert(m->processing_count == 0);
if (!h2_ihash_empty(m->shold)) {
@@ -687,15 +696,13 @@ void h2_mplx_c1_process(h2_mplx *m,
H2_MPLX_MSG(m, "stream %d not found to process"), sid);
}
}
- if (!m->is_registered && !h2_iq_empty(m->q)) {
- m->is_registered = 1;
+ if ((m->processing_count < m->processing_limit) && !h2_iq_empty(m->q)) {
H2_MPLX_LEAVE(m);
- rv = h2_workers_register(m->workers, m);
+ rv = h2_workers_activate(m->workers, m->producer);
H2_MPLX_ENTER_ALWAYS(m);
if (rv != APR_SUCCESS) {
- m->is_registered = 0;
ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, APLOGNO(10021)
- H2_MPLX_MSG(m, "register at workers"));
+ H2_MPLX_MSG(m, "activate at workers"));
}
}
*pstream_count = (int)h2_ihash_count(m->streams);
@@ -863,24 +870,18 @@ cleanup:
return c2;
}
-apr_status_t h2_mplx_worker_pop_c2(h2_mplx *m, conn_rec **out_c)
+static conn_rec *c2_prod_next(void *baton, int *phas_more)
{
- apr_status_t rv;
+ h2_mplx *m = baton;
+ conn_rec *c = NULL;
H2_MPLX_ENTER_ALWAYS(m);
- if (m->aborted) {
- *out_c = NULL;
- rv = APR_EOF;
- }
- else {
- *out_c = s_next_c2(m);
- rv = (*out_c != NULL && !h2_iq_empty(m->q))? APR_EAGAIN : APR_SUCCESS;
- }
- if (APR_EAGAIN != rv) {
- m->is_registered = 0; /* h2_workers will discard this mplx */
+ if (!m->aborted) {
+ c = s_next_c2(m);
+ *phas_more = (c != NULL && !h2_iq_empty(m->q));
}
H2_MPLX_LEAVE(m);
- return rv;
+ return c;
}
static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
@@ -947,34 +948,18 @@ static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
}
}
-void h2_mplx_worker_c2_done(conn_rec *c2)
+static void c2_prod_done(void *baton, conn_rec *c2)
{
+ h2_mplx *m = baton;
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2);
- h2_mplx *m;
AP_DEBUG_ASSERT(conn_ctx);
- m = conn_ctx->mplx;
H2_MPLX_ENTER_ALWAYS(m);
--m->processing_count;
s_c2_done(m, c2, conn_ctx);
if (m->join_wait) apr_thread_cond_signal(m->join_wait);
- if (!m->aborted && !m->is_registered
- && (m->processing_count < m->processing_limit)
- && !h2_iq_empty(m->q)) {
- /* We have a limit on the amount of c2s we process at a time. When
- * this is reached, we do no longer have things to do for h2 workers
- * and they remove such an mplx from its queue.
- * When a c2 is done, there might then be room for more processing
- * and we need then to register this mplx at h2 workers again.
- */
- m->is_registered = 1;
- H2_MPLX_LEAVE(m);
- h2_workers_register(m->workers, m);
- return;
- }
-
H2_MPLX_LEAVE(m);
}
diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h
index 42faf051ad..e056acacdd 100644
--- a/modules/http2/h2_mplx.h
+++ b/modules/http2/h2_mplx.h
@@ -44,6 +44,8 @@ struct h2_iqueue;
#include <apr_queue.h>
+#include "h2_workers.h"
+
typedef struct h2_c2_transit h2_c2_transit;
struct h2_c2_transit {
@@ -63,7 +65,7 @@ struct h2_mplx {
int aborted;
int polling; /* is waiting/processing pollset events */
- int is_registered; /* is registered at h2_workers */
+ ap_conn_producer_t *producer; /* registered producer at h2_workers */
struct h2_ihash_t *streams; /* all streams active */
struct h2_ihash_t *shold; /* all streams done with c2 processing ongoing */
@@ -218,12 +220,6 @@ const struct h2_stream *h2_mplx_c2_stream_get(h2_mplx *m, int stream_id);
*/
apr_status_t h2_mplx_worker_pop_c2(h2_mplx *m, conn_rec **out_c2);
-/**
- * A h2 worker reports a secondary connection processing done.
- * @param c2 the secondary connection finished processing
- */
-void h2_mplx_worker_c2_done(conn_rec *c2);
-
#define H2_MPLX_MSG(m, msg) \
"h2_mplx(%d-%lu): "msg, m->child_num, (unsigned long)m->id
diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c
index 22c31f4f83..c8796aeeac 100644
--- a/modules/http2/h2_workers.c
+++ b/modules/http2/h2_workers.c
@@ -15,7 +15,7 @@
*/
#include <assert.h>
-#include <apr_atomic.h>
+#include <apr_ring.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>
@@ -33,309 +33,329 @@
#include "h2_workers.h"
#include "h2_util.h"
+typedef enum {
+ PROD_IDLE,
+ PROD_ACTIVE,
+ PROD_JOINED,
+} prod_state_t;
+
+struct ap_conn_producer_t {
+ APR_RING_ENTRY(ap_conn_producer_t) link;
+ const char *name;
+ void *baton;
+ ap_conn_producer_next *fn_next;
+ ap_conn_producer_done *fn_done;
+ volatile prod_state_t state;
+ volatile int conns_active;
+};
+
+
+typedef enum {
+ H2_SLOT_FREE,
+ H2_SLOT_RUN,
+ H2_SLOT_ZOMBIE,
+} h2_slot_state_t;
+
typedef struct h2_slot h2_slot;
struct h2_slot {
+ APR_RING_ENTRY(h2_slot) link;
int id;
- h2_slot *next;
+ apr_pool_t *pool;
+ h2_slot_state_t state;
+ volatile int should_shutdown;
+ volatile int is_idle;
h2_workers *workers;
- conn_rec *connection;
+ ap_conn_producer_t *prod;
apr_thread_t *thread;
- apr_thread_mutex_t *lock;
- apr_thread_cond_t *not_idle;
- /* atomic */ apr_uint32_t timed_out;
+ struct apr_thread_cond_t *more_work;
+ int activations;
};
-static h2_slot *pop_slot(h2_slot *volatile *phead)
-{
- /* Atomically pop a slot from the list */
- for (;;) {
- h2_slot *first = *phead;
- if (first == NULL) {
- return NULL;
- }
- if (apr_atomic_casptr((void*)phead, first->next, first) == first) {
- first->next = NULL;
- return first;
- }
- }
-}
+struct h2_workers {
+ server_rec *s;
+ apr_pool_t *pool;
+
+ apr_uint32_t max_slots;
+ apr_uint32_t min_active;
+ volatile int idle_limit;
+ volatile int aborted;
+ volatile int shutdown;
+ int dynamic;
+
+ volatile apr_uint32_t active_slots;
+ volatile apr_uint32_t idle_slots;
+
+ apr_threadattr_t *thread_attr;
+ h2_slot *slots;
+
+ APR_RING_HEAD(h2_slots_free, h2_slot) free;
+ APR_RING_HEAD(h2_slots_idle, h2_slot) idle;
+ APR_RING_HEAD(h2_slots_busy, h2_slot) busy;
+ APR_RING_HEAD(h2_slots_zombie, h2_slot) zombie;
+
+ APR_RING_HEAD(ap_conn_producer_active, ap_conn_producer_t) prod_active;
+ APR_RING_HEAD(ap_conn_producer_idle, ap_conn_producer_t) prod_idle;
+
+ struct apr_thread_mutex_t *lock;
+ struct apr_thread_cond_t *prod_done;
+ struct apr_thread_cond_t *all_done;
+};
-static void push_slot(h2_slot *volatile *phead, h2_slot *slot)
-{
- /* Atomically push a slot to the list */
- ap_assert(!slot->next);
- for (;;) {
- h2_slot *next = slot->next = *phead;
- if (apr_atomic_casptr((void*)phead, slot, next) == next) {
- return;
- }
- }
-}
static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx);
-static void slot_done(h2_slot *slot);
-static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
+static apr_status_t activate_slot(h2_workers *workers)
{
+ h2_slot *slot;
+ apr_pool_t *pool;
apr_status_t rv;
-
- slot->workers = workers;
- slot->connection = NULL;
- apr_thread_mutex_lock(workers->lock);
- if (!slot->lock) {
- rv = apr_thread_mutex_create(&slot->lock,
- APR_THREAD_MUTEX_DEFAULT,
- workers->pool);
- if (rv != APR_SUCCESS) goto cleanup;
+ if (APR_RING_EMPTY(&workers->free, h2_slot, link)) {
+ return APR_EAGAIN;
}
+ slot = APR_RING_FIRST(&workers->free);
+ ap_assert(slot->state == H2_SLOT_FREE);
+ APR_RING_REMOVE(slot, link);
- if (!slot->not_idle) {
- rv = apr_thread_cond_create(&slot->not_idle, workers->pool);
- if (rv != APR_SUCCESS) goto cleanup;
- }
-
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_workers: new thread for slot %d", slot->id);
+ "h2_workers: activate slot %d", slot->id);
+
+ slot->state = H2_SLOT_RUN;
+ slot->should_shutdown = 0;
+ slot->is_idle = 0;
+ slot->pool = NULL;
+ ++workers->active_slots;
+ rv = apr_pool_create(&pool, workers->pool);
+ if (APR_SUCCESS != rv) goto cleanup;
+ apr_pool_tag(pool, "h2_worker_slot");
+ slot->pool = pool;
- /* thread will either immediately start work or add itself
- * to the idle queue */
- apr_atomic_inc32(&workers->worker_count);
- apr_atomic_set32(&slot->timed_out, 0);
rv = ap_thread_create(&slot->thread, workers->thread_attr,
- slot_run, slot, workers->pool);
- if (rv != APR_SUCCESS) {
- apr_atomic_dec32(&workers->worker_count);
- }
+ slot_run, slot, slot->pool);
cleanup:
- apr_thread_mutex_unlock(workers->lock);
if (rv != APR_SUCCESS) {
- push_slot(&workers->free, slot);
- }
- return rv;
-}
-
-static apr_status_t add_worker(h2_workers *workers)
-{
- h2_slot *slot = pop_slot(&workers->free);
- if (slot) {
- return activate_slot(workers, slot);
- }
- return APR_EAGAIN;
-}
-
-static void wake_idle_worker(h2_workers *workers)
-{
- h2_slot *slot;;
- for (;;) {
- slot = pop_slot(&workers->idle);
- if (!slot) {
- if (workers->dynamic && apr_atomic_read32(&workers->shutdown) == 0) {
- add_worker(workers);
- }
- return;
- }
- if (!apr_atomic_read32(&slot->timed_out)) {
- apr_thread_mutex_lock(slot->lock);
- apr_thread_cond_signal(slot->not_idle);
- apr_thread_mutex_unlock(slot->lock);
- return;
+ AP_DEBUG_ASSERT(0);
+ slot->state = H2_SLOT_FREE;
+ if (slot->pool) {
+ apr_pool_destroy(slot->pool);
+ slot->pool = NULL;
}
- slot_done(slot);
+ APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link);
+ --workers->active_slots;
}
+ return rv;
}
static void join_zombies(h2_workers *workers)
{
h2_slot *slot;
- while ((slot = pop_slot(&workers->zombies))) {
- apr_status_t status;
+ apr_status_t status;
+
+ while (!APR_RING_EMPTY(&workers->zombie, h2_slot, link)) {
+ slot = APR_RING_FIRST(&workers->zombie);
+ APR_RING_REMOVE(slot, link);
+ ap_assert(slot->state == H2_SLOT_ZOMBIE);
ap_assert(slot->thread != NULL);
+
+ apr_thread_mutex_unlock(workers->lock);
apr_thread_join(&status, slot->thread);
- slot->thread = NULL;
+ apr_thread_mutex_lock(workers->lock);
- push_slot(&workers->free, slot);
+ slot->thread = NULL;
+ slot->state = H2_SLOT_FREE;
+ if (slot->pool) {
+ apr_pool_destroy(slot->pool);
+ slot->pool = NULL;
+ }
+ APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link);
}
}
-static apr_status_t slot_pull_c2(h2_slot *slot, h2_mplx *m)
+static void wake_idle_worker(h2_workers *workers, ap_conn_producer_t *prod)
{
- apr_status_t rv;
-
- rv = h2_mplx_worker_pop_c2(m, &slot->connection);
- if (slot->connection) {
- return rv;
+ if (!APR_RING_EMPTY(&workers->idle, h2_slot, link)) {
+ h2_slot *slot;
+ for (slot = APR_RING_FIRST(&workers->idle);
+ slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link);
+ slot = APR_RING_NEXT(slot, link)) {
+ if (slot->is_idle && !slot->should_shutdown) {
+ apr_thread_cond_signal(slot->more_work);
+ slot->is_idle = 0;
+ return;
+ }
+ }
+ }
+ if (workers->dynamic && !workers->shutdown
+ && (workers->active_slots < workers->max_slots)) {
+ activate_slot(workers);
}
- return APR_EOF;
-}
-
-static h2_fifo_op_t mplx_peek(void *head, void *ctx)
-{
- h2_mplx *m = head;
- h2_slot *slot = ctx;
-
- if (slot_pull_c2(slot, m) == APR_EAGAIN) {
- wake_idle_worker(slot->workers);
- return H2_FIFO_OP_REPUSH;
- }
- return H2_FIFO_OP_PULL;
}
/**
- * Get the next c2 for the given worker. Will block until a c2 arrives
- * or the max_wait timer expires and more than min workers exist.
+ * Get the next connection to work on.
*/
-static int get_next(h2_slot *slot)
+static conn_rec *get_next(h2_slot *slot)
{
h2_workers *workers = slot->workers;
- int non_essential = slot->id >= workers->min_workers;
- apr_status_t rv;
-
- while (apr_atomic_read32(&workers->aborted) == 0
- && apr_atomic_read32(&slot->timed_out) == 0) {
- ap_assert(slot->connection == NULL);
- if (non_essential && apr_atomic_read32(&workers->shutdown)) {
- /* Terminate non-essential worker on shutdown */
- break;
+ conn_rec *c = NULL;
+ ap_conn_producer_t *prod;
+ int has_more;
+
+ slot->prod = NULL;
+ if (!APR_RING_EMPTY(&workers->prod_active, ap_conn_producer_t, link)) {
+ slot->prod = prod = APR_RING_FIRST(&workers->prod_active);
+ APR_RING_REMOVE(prod, link);
+ AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state);
+
+ c = prod->fn_next(prod->baton, &has_more);
+ if (c && has_more) {
+ APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link);
+ wake_idle_worker(workers, slot->prod);
}
- if (h2_fifo_try_peek(workers->mplxs, mplx_peek, slot) == APR_EOF) {
- /* The queue is terminated with the MPM child being cleaned up,
- * just leave. */
- break;
+ else {
+ prod->state = PROD_IDLE;
+ APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link);
}
- if (slot->connection) {
- return 1;
+ if (c) {
+ ++prod->conns_active;
}
-
- join_zombies(workers);
-
- apr_thread_mutex_lock(slot->lock);
- if (apr_atomic_read32(&workers->aborted) == 0) {
- apr_uint32_t idle_secs;
-
- push_slot(&workers->idle, slot);
- if (non_essential
- && (idle_secs = apr_atomic_read32(&workers->max_idle_secs))) {
- rv = apr_thread_cond_timedwait(slot->not_idle, slot->lock,
- apr_time_from_sec(idle_secs));
- if (APR_TIMEUP == rv) {
- apr_atomic_set32(&slot->timed_out, 1);
- }
- }
- else {
- apr_thread_cond_wait(slot->not_idle, slot->lock);
- }
- }
- apr_thread_mutex_unlock(slot->lock);
}
- return 0;
+ return c;
}
-static void slot_done(h2_slot *slot)
+static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
{
+ h2_slot *slot = wctx;
h2_workers *workers = slot->workers;
+ conn_rec *c;
+ apr_status_t rv;
- push_slot(&workers->zombies, slot);
+ apr_thread_mutex_lock(workers->lock);
+ slot->state = H2_SLOT_RUN;
+ ++slot->activations;
+ APR_RING_ELEM_INIT(slot, link);
+ for(;;) {
+ if (APR_RING_NEXT(slot, link) != slot) {
+ /* slot is part of the idle ring from the last loop */
+ APR_RING_REMOVE(slot, link);
+ --workers->idle_slots;
+ }
+ slot->is_idle = 0;
+
+ if (!workers->aborted && !slot->should_shutdown) {
+ APR_RING_INSERT_TAIL(&workers->busy, slot, h2_slot, link);
+ do {
+ c = get_next(slot);
+ if (!c) {
+ break;
+ }
+ apr_thread_mutex_unlock(workers->lock);
+ /* See the discussion at <https://github.com/icing/mod_h2/issues/195>
+ *
+ * Each conn_rec->id is supposed to be unique at a point in time. Since
+ * some modules (and maybe external code) uses this id as an identifier
+ * for the request_rec they handle, it needs to be unique for secondary
+ * connections also.
+ *
+ * The MPM module assigns the connection ids and mod_unique_id is using
+ * that one to generate identifier for requests. While the implementation
+ * works for HTTP/1.x, the parallel execution of several requests per
+ * connection will generate duplicate identifiers on load.
+ *
+ * The original implementation for secondary connection identifiers used
+ * to shift the master connection id up and assign the stream id to the
+ * lower bits. This was cramped on 32 bit systems, but on 64bit there was
+ * enough space.
+ *
+ * As issue 195 showed, mod_unique_id only uses the lower 32 bit of the
+ * connection id, even on 64bit systems. Therefore collisions in request ids.
+ *
+ * The way master connection ids are generated, there is some space "at the
+ * top" of the lower 32 bits on allmost all systems. If you have a setup
+ * with 64k threads per child and 255 child processes, you live on the edge.
+ *
+ * The new implementation shifts 8 bits and XORs in the worker
+ * id. This will experience collisions with > 256 h2 workers and heavy
+ * load still. There seems to be no way to solve this in all possible
+ * configurations by mod_h2 alone.
+ */
+ if (c->master) {
+ c->id = (c->master->id << 8)^slot->id;
+ }
+ c->current_thread = thread;
+ AP_DEBUG_ASSERT(slot->prod);
- /* If this worker is the last one exiting and the MPM child is stopping,
- * unblock workers_pool_cleanup().
- */
- if (!apr_atomic_dec32(&workers->worker_count)
- && apr_atomic_read32(&workers->aborted)) {
- apr_thread_mutex_lock(workers->lock);
- apr_thread_cond_signal(workers->all_done);
- apr_thread_mutex_unlock(workers->lock);
- }
-}
+ ap_process_connection(c, ap_get_conn_socket(c));
+ slot->prod->fn_done(slot->prod->baton, c);
+ apr_thread_mutex_lock(workers->lock);
+ if (--slot->prod->conns_active <= 0) {
+ apr_thread_cond_broadcast(workers->prod_done);
+ }
+ if (slot->prod->state == PROD_IDLE) {
+ APR_RING_REMOVE(slot->prod, link);
+ slot->prod->state = PROD_ACTIVE;
+ APR_RING_INSERT_TAIL(&workers->prod_active, slot->prod, ap_conn_producer_t, link);
+ }
-static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
-{
- h2_slot *slot = wctx;
- conn_rec *c;
-
- /* Get the next c2 from mplx to process. */
- while (get_next(slot)) {
- /* See the discussion at <https://github.com/icing/mod_h2/issues/195>
- *
- * Each conn_rec->id is supposed to be unique at a point in time. Since
- * some modules (and maybe external code) uses this id as an identifier
- * for the request_rec they handle, it needs to be unique for secondary
- * connections also.
- *
- * The MPM module assigns the connection ids and mod_unique_id is using
- * that one to generate identifier for requests. While the implementation
- * works for HTTP/1.x, the parallel execution of several requests per
- * connection will generate duplicate identifiers on load.
- *
- * The original implementation for secondary connection identifiers used
- * to shift the master connection id up and assign the stream id to the
- * lower bits. This was cramped on 32 bit systems, but on 64bit there was
- * enough space.
- *
- * As issue 195 showed, mod_unique_id only uses the lower 32 bit of the
- * connection id, even on 64bit systems. Therefore collisions in request ids.
- *
- * The way master connection ids are generated, there is some space "at the
- * top" of the lower 32 bits on allmost all systems. If you have a setup
- * with 64k threads per child and 255 child processes, you live on the edge.
- *
- * The new implementation shifts 8 bits and XORs in the worker
- * id. This will experience collisions with > 256 h2 workers and heavy
- * load still. There seems to be no way to solve this in all possible
- * configurations by mod_h2 alone.
- */
- AP_DEBUG_ASSERT(slot->connection != NULL);
- c = slot->connection;
- slot->connection = NULL;
- c->id = (c->master->id << 8)^slot->id;
- c->current_thread = thread;
+ } while (!workers->aborted && !slot->should_shutdown);
+ APR_RING_REMOVE(slot, link); /* no longer busy */
+ }
- ap_process_connection(c, ap_get_conn_socket(c));
+ if (workers->aborted || slot->should_shutdown) {
+ break;
+ }
- h2_mplx_worker_c2_done(c);
+ join_zombies(workers);
+
+ /* we are idle */
+ APR_RING_INSERT_TAIL(&workers->idle, slot, h2_slot, link);
+ ++workers->idle_slots;
+ slot->is_idle = 1;
+ if (slot->id >= workers->min_active && workers->idle_limit) {
+ rv = apr_thread_cond_timedwait(slot->more_work, workers->lock,
+ workers->idle_limit);
+ if (APR_TIMEUP == rv) {
+ APR_RING_REMOVE(slot, link);
+ --workers->idle_slots;
+ ap_log_error(APLOG_MARK, APLOG_ERR, 0, workers->s,
+ "h2_workers: idle timeout slot %d in state %d (%d activations)",
+ slot->id, slot->state, slot->activations);
+ break;
+ }
+ }
+ else {
+ apr_thread_cond_wait(slot->more_work, workers->lock);
+ }
}
- if (apr_atomic_read32(&slot->timed_out) == 0) {
- slot_done(slot);
+ ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
+ "h2_workers: terminate slot %d in state %d (%d activations)",
+ slot->id, slot->state, slot->activations);
+ slot->is_idle = 0;
+ slot->state = H2_SLOT_ZOMBIE;
+ slot->should_shutdown = 0;
+ APR_RING_INSERT_TAIL(&workers->zombie, slot, h2_slot, link);
+ --workers->active_slots;
+ if (workers->active_slots <= 0) {
+ apr_thread_cond_broadcast(workers->all_done);
}
+ apr_thread_mutex_unlock(workers->lock);
apr_thread_exit(thread, APR_SUCCESS);
return NULL;
}
-static void wake_non_essential_workers(h2_workers *workers)
+static void wake_all_idles(h2_workers *workers)
{
h2_slot *slot;
- /* pop all idle, signal the non essentials and add the others again */
- if ((slot = pop_slot(&workers->idle))) {
- wake_non_essential_workers(workers);
- if (slot->id > workers->min_workers) {
- apr_thread_mutex_lock(slot->lock);
- apr_thread_cond_signal(slot->not_idle);
- apr_thread_mutex_unlock(slot->lock);
- }
- else {
- push_slot(&workers->idle, slot);
- }
- }
-}
-
-static void workers_abort_idle(h2_workers *workers)
-{
- h2_slot *slot;
-
- apr_atomic_set32(&workers->shutdown, 1);
- apr_atomic_set32(&workers->aborted, 1);
- h2_fifo_term(workers->mplxs);
-
- /* abort all idle slots */
- while ((slot = pop_slot(&workers->idle))) {
- apr_thread_mutex_lock(slot->lock);
- apr_thread_cond_signal(slot->not_idle);
- apr_thread_mutex_unlock(slot->lock);
+ for (slot = APR_RING_FIRST(&workers->idle);
+ slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link);
+ slot = APR_RING_NEXT(slot, link))
+ {
+ apr_thread_cond_signal(slot->more_work);
}
}
@@ -347,37 +367,47 @@ static apr_status_t workers_pool_cleanup(void *data)
int n, wait_sec = 5;
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
- "h2_workers: cleanup %d workers idling",
- (int)apr_atomic_read32(&workers->worker_count));
- workers_abort_idle(workers);
+ "h2_workers: cleanup %d workers (%d idle)",
+ workers->active_slots, workers->idle_slots);
+ apr_thread_mutex_lock(workers->lock);
+ workers->shutdown = 1;
+ workers->aborted = 1;
+ wake_all_idles(workers);
+ apr_thread_mutex_unlock(workers->lock);
/* wait for all the workers to become zombies and join them.
* this gets called after the mpm shuts down and all connections
* have either been handled (graceful) or we are forced exiting
* (ungrateful). Either way, we show limited patience. */
- apr_thread_mutex_lock(workers->lock);
end = apr_time_now() + apr_time_from_sec(wait_sec);
- while ((n = apr_atomic_read32(&workers->worker_count)) > 0
- && apr_time_now() < end) {
+ while (apr_time_now() < end) {
+ apr_thread_mutex_lock(workers->lock);
+ if (!(n = workers->active_slots)) {
+ apr_thread_mutex_unlock(workers->lock);
+ break;
+ }
+ wake_all_idles(workers);
rv = apr_thread_cond_timedwait(workers->all_done, workers->lock, timeout);
+ apr_thread_mutex_unlock(workers->lock);
+
if (APR_TIMEUP == rv) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
- APLOGNO(10290) "h2_workers: waiting for idle workers to close, "
- "still seeing %d workers living",
- apr_atomic_read32(&workers->worker_count));
- continue;
+ APLOGNO(10290) "h2_workers: waiting for workers to close, "
+ "still seeing %d workers (%d idle) living",
+ workers->active_slots, workers->idle_slots);
}
}
if (n) {
ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s,
- APLOGNO(10291) "h2_workers: cleanup, %d idle workers "
+ APLOGNO(10291) "h2_workers: cleanup, %d workers (%d idle) "
"did not exit after %d seconds.",
- n, wait_sec);
+ n, workers->idle_slots, wait_sec);
}
- apr_thread_mutex_unlock(workers->lock);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
"h2_workers: cleanup all workers terminated");
+ apr_thread_mutex_lock(workers->lock);
join_zombies(workers);
+ apr_thread_mutex_unlock(workers->lock);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
"h2_workers: cleanup zombie workers joined");
@@ -385,13 +415,13 @@ static apr_status_t workers_pool_cleanup(void *data)
}
h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild,
- int min_workers, int max_workers,
- int idle_secs)
+ int max_slots, int min_active, apr_time_t idle_limit)
{
apr_status_t rv;
h2_workers *workers;
apr_pool_t *pool;
- int i, n;
+ apr_allocator_t *allocator;
+ int i, locked = 0;
ap_assert(s);
ap_assert(pchild);
@@ -401,7 +431,16 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild,
* guarded by our lock. Without this pool, all subpool creations would
* happen on the pool handed to us, which we do not guard.
*/
- apr_pool_create(&pool, pchild);
+ rv = apr_allocator_create(&allocator);
+ if (rv != APR_SUCCESS) {
+ goto cleanup;
+ }
+ rv = apr_pool_create_ex(&pool, pchild, NULL, allocator);
+ if (rv != APR_SUCCESS) {
+ apr_allocator_destroy(allocator);
+ goto cleanup;
+ }
+ apr_allocator_owner_set(allocator, pool);
apr_pool_tag(pool, "h2_workers");
workers = apr_pcalloc(pool, sizeof(h2_workers));
if (!workers) {
@@ -410,19 +449,23 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild,
workers->s = s;
workers->pool = pool;
- workers->min_workers = min_workers;
- workers->max_workers = max_workers;
- workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
+ workers->min_active = min_active;
+ workers->max_slots = max_slots;
+ workers->idle_limit = (idle_limit > 0)? idle_limit : apr_time_from_sec(10);
+ workers->dynamic = (workers->min_active < workers->max_slots);
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
- "h2_workers: created with min=%d max=%d idle_timeout=%d sec",
- workers->min_workers, workers->max_workers,
- (int)workers->max_idle_secs);
- /* FIXME: the fifo set we use here has limited capacity. Once the
- * set is full, connections with new requests do a wait.
- */
- rv = h2_fifo_set_create(&workers->mplxs, pool, 16 * 1024);
- if (rv != APR_SUCCESS) goto cleanup;
+ ap_log_error(APLOG_MARK, APLOG_INFO, 0, workers->s,
+ "h2_workers: created with min=%d max=%d idle_ms=%d",
+ workers->min_active, workers->max_slots,
+ (int)apr_time_as_msec(idle_limit));
+
+ APR_RING_INIT(&workers->idle, h2_slot, link);
+ APR_RING_INIT(&workers->busy, h2_slot, link);
+ APR_RING_INIT(&workers->free, h2_slot, link);
+ APR_RING_INIT(&workers->zombie, h2_slot, link);
+
+ APR_RING_INIT(&workers->prod_active, ap_conn_producer_t, link);
+ APR_RING_INIT(&workers->prod_idle, ap_conn_producer_t, link);
rv = apr_threadattr_create(&workers->thread_attr, workers->pool);
if (rv != APR_SUCCESS) goto cleanup;
@@ -441,32 +484,35 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild,
if (rv != APR_SUCCESS) goto cleanup;
rv = apr_thread_cond_create(&workers->all_done, workers->pool);
if (rv != APR_SUCCESS) goto cleanup;
+ rv = apr_thread_cond_create(&workers->prod_done, workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
- n = workers->nslots = workers->max_workers;
- workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
- if (workers->slots == NULL) {
- n = workers->nslots = 0;
- rv = APR_ENOMEM;
- goto cleanup;
- }
- for (i = 0; i < n; ++i) {
+ apr_thread_mutex_lock(workers->lock);
+ locked = 1;
+
+ /* create the slots and put them on the free list */
+ workers->slots = apr_pcalloc(workers->pool, workers->max_slots * sizeof(h2_slot));
+
+ for (i = 0; i < workers->max_slots; ++i) {
workers->slots[i].id = i;
- }
- /* we activate all for now, TODO: support min_workers again.
- * do this in reverse for vanity reasons so slot 0 will most
- * likely be at head of idle queue. */
- n = workers->min_workers;
- for (i = n-1; i >= 0; --i) {
- rv = activate_slot(workers, &workers->slots[i]);
+ workers->slots[i].state = H2_SLOT_FREE;
+ workers->slots[i].workers = workers;
+ APR_RING_ELEM_INIT(&workers->slots[i], link);
+ APR_RING_INSERT_TAIL(&workers->free, &workers->slots[i], h2_slot, link);
+ rv = apr_thread_cond_create(&workers->slots[i].more_work, workers->pool);
if (rv != APR_SUCCESS) goto cleanup;
}
- /* the rest of the slots go on the free list */
- for(i = n; i < workers->nslots; ++i) {
- push_slot(&workers->free, &workers->slots[i]);
+
+ /* activate the min amount of workers */
+ for (i = 0; i < workers->min_active; ++i) {
+ rv = activate_slot(workers);
+ if (rv != APR_SUCCESS) goto cleanup;
}
- workers->dynamic = (workers->worker_count < workers->max_workers);
cleanup:
+ if (locked) {
+ apr_thread_mutex_unlock(workers->lock);
+ }
if (rv == APR_SUCCESS) {
/* Stop/join the workers threads when the MPM child exits (pchild is
* destroyed), and as a pre_cleanup of pchild thus before the threads
@@ -476,24 +522,84 @@ cleanup:
apr_pool_pre_cleanup_register(pchild, workers, workers_pool_cleanup);
return workers;
}
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, workers->s,
+ "h2_workers: errors initializing");
return NULL;
}
-apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
+apr_size_t h2_workers_get_max_workers(h2_workers *workers)
{
- apr_status_t status = h2_fifo_push(workers->mplxs, m);
- wake_idle_worker(workers);
- return status;
+ return workers->max_slots;
}
-apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
+void h2_workers_graceful_shutdown(h2_workers *workers)
{
- return h2_fifo_remove(workers->mplxs, m);
+ apr_thread_mutex_lock(workers->lock);
+ workers->shutdown = 1;
+ workers->idle_limit = apr_time_from_sec(1);
+ wake_all_idles(workers);
+ apr_thread_mutex_unlock(workers->lock);
}
-void h2_workers_graceful_shutdown(h2_workers *workers)
+ap_conn_producer_t *h2_workers_register(h2_workers *workers,
+ apr_pool_t *producer_pool,
+ const char *name,
+ ap_conn_producer_next *fn_next,
+ ap_conn_producer_done *fn_done,
+ void *baton)
{
- apr_atomic_set32(&workers->shutdown, 1);
- apr_atomic_set32(&workers->max_idle_secs, 1);
- wake_non_essential_workers(workers);
+ ap_conn_producer_t *prod;
+
+ prod = apr_pcalloc(producer_pool, sizeof(*prod));
+ APR_RING_ELEM_INIT(prod, link);
+ prod->name = name;
+ prod->fn_next = fn_next;
+ prod->fn_done = fn_done;
+ prod->baton = baton;
+
+ apr_thread_mutex_lock(workers->lock);
+ prod->state = PROD_IDLE;
+ APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link);
+ apr_thread_mutex_unlock(workers->lock);
+
+ return prod;
+}
+
+apr_status_t h2_workers_join(h2_workers *workers, ap_conn_producer_t *prod)
+{
+ apr_status_t rv = APR_SUCCESS;
+
+ apr_thread_mutex_lock(workers->lock);
+ if (PROD_JOINED == prod->state) {
+ AP_DEBUG_ASSERT(APR_RING_NEXT(prod, link) == prod); /* should be in no ring */
+ rv = APR_EINVAL;
+ }
+ else {
+ AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state || PROD_IDLE == prod->state);
+ APR_RING_REMOVE(prod, link);
+ prod->state = PROD_JOINED; /* prevent further activations */
+ while (prod->conns_active > 0) {
+ apr_thread_cond_wait(workers->prod_done, workers->lock);
+ }
+ APR_RING_ELEM_INIT(prod, link); /* make it link to itself */
+ }
+ apr_thread_mutex_unlock(workers->lock);
+ return rv;
+}
+
+apr_status_t h2_workers_activate(h2_workers *workers, ap_conn_producer_t *prod)
+{
+ apr_status_t rv = APR_SUCCESS;
+ apr_thread_mutex_lock(workers->lock);
+ if (PROD_IDLE == prod->state) {
+ APR_RING_REMOVE(prod, link);
+ prod->state = PROD_ACTIVE;
+ APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link);
+ wake_idle_worker(workers, prod);
+ }
+ else if (PROD_JOINED == prod->state) {
+ rv = APR_EINVAL;
+ }
+ apr_thread_mutex_unlock(workers->lock);
+ return rv;
}
diff --git a/modules/http2/h2_workers.h b/modules/http2/h2_workers.h
index 0de3040676..20169a0d50 100644
--- a/modules/http2/h2_workers.h
+++ b/modules/http2/h2_workers.h
@@ -28,59 +28,94 @@ struct h2_mplx;
struct h2_request;
struct h2_fifo;
-struct h2_slot;
-
typedef struct h2_workers h2_workers;
-struct h2_workers {
- server_rec *s;
- apr_pool_t *pool;
-
- int next_worker_id;
- apr_uint32_t max_workers;
- apr_uint32_t min_workers;
- /* atomic */ apr_uint32_t worker_count;
- /* atomic */ apr_uint32_t max_idle_secs;
- /* atomic */ apr_uint32_t aborted;
- /* atomic */ apr_uint32_t shutdown;
- int dynamic;
- apr_threadattr_t *thread_attr;
- int nslots;
- struct h2_slot *slots;
+/**
+ * Create a worker set with a maximum number of 'slots', e.g. worker
+ * threads to run. Always keep `min_active` workers running. Shutdown
+ * any additional workers after `idle_secs` seconds of doing nothing.
+ *
+ * @oaram s the base server
+ * @param pool for allocations
+ * @param min_active minimum number of workers to run
+ * @param max_slots maximum number of worker slots
+ * @param idle_limit upper duration of idle after a non-minimal slots shuts down
+ */
+h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool,
+ int max_slots, int min_active, apr_time_t idle_limit);
- struct h2_slot *free;
- struct h2_slot *idle;
- struct h2_slot *zombies;
-
- struct h2_fifo *mplxs;
-
- struct apr_thread_mutex_t *lock;
- struct apr_thread_cond_t *all_done;
-};
+/**
+ * Shut down processing gracefully by terminating all idle workers.
+ */
+void h2_workers_graceful_shutdown(h2_workers *workers);
+/**
+ * Get the maximum number of workers.
+ */
+apr_size_t h2_workers_get_max_workers(h2_workers *workers);
-/* Create a worker pool with the given minimum and maximum number of
- * threads.
+/**
+ * ap_conn_producer_t is the source of connections (conn_rec*) to run.
+ *
+ * Active producers are queried by idle workers for connections.
+ * If they do not hand one back, they become inactive and are not
+ * queried further. `h2_workers_activate()` places them on the active
+ * list again.
+ *
+ * A producer finishing MUST call `h2_workers_join()` which removes
+ * it completely from workers processing and waits for all ongoing
+ * work for this producer to be done.
*/
-h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool,
- int min_size, int max_size, int idle_secs);
+typedef struct ap_conn_producer_t ap_conn_producer_t;
/**
- * Registers a h2_mplx for scheduling. If this h2_mplx runs
- * out of work, it will be automatically be unregistered. Should
- * new work arrive, it needs to be registered again.
+ * Ask a producer for the next connection to process.
+ * @param baton value from producer registration
+ * @param pconn holds the connection to process on return
+ * @param pmore if the producer has more connections that may be retrieved
+ * @return APR_SUCCESS for a connection to process, APR_EAGAIN for no
+ * connection being available at the time.
*/
-apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m);
+typedef conn_rec *ap_conn_producer_next(void *baton, int *pmore);
/**
- * Remove a h2_mplx from the worker registry.
+ * Tell the producer that processing the connection is done.
+ * @param baton value from producer registration
+ * @param conn the connection that has been processed.
*/
-apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m);
+typedef void ap_conn_producer_done(void *baton, conn_rec *conn);
/**
- * Shut down processing gracefully by terminating all idle workers.
+ * Register a new producer with the given `baton` and callback functions.
+ * Will allocate internal structures from the given pool (but make no use
+ * of the pool after registration).
+ * Producers are inactive on registration. See `h2_workers_activate()`.
+ * @param producer_pool to allocate the producer from
+ * @param name descriptive name of the producer, must not be unique
+ * @param fn_next callback for retrieving connections to process
+ * @param fn_done callback for processed connections
+ * @param baton provided value passed on in callbacks
+ * @return the producer instance created
*/
-void h2_workers_graceful_shutdown(h2_workers *workers);
+ap_conn_producer_t *h2_workers_register(h2_workers *workers,
+ apr_pool_t *producer_pool,
+ const char *name,
+ ap_conn_producer_next *fn_next,
+ ap_conn_producer_done *fn_done,
+ void *baton);
+
+/**
+ * Stop retrieving more connection from the producer and wait
+ * for all ongoing for from that producer to be done.
+ */
+apr_status_t h2_workers_join(h2_workers *workers, ap_conn_producer_t *producer);
+
+/**
+ * Activate a producer. A worker will query the producer for a connection
+ * to process, once a worker is available.
+ * This may be called, irregardless of the producers active/inactive.
+ */
+apr_status_t h2_workers_activate(h2_workers *workers, ap_conn_producer_t *producer);
#endif /* defined(__mod_h2__h2_workers__) */
diff --git a/modules/http2/mod_http2.c b/modules/http2/mod_http2.c
index 36acc432f9..a4800c148b 100644
--- a/modules/http2/mod_http2.c
+++ b/modules/http2/mod_http2.c
@@ -150,7 +150,9 @@ static int http2_is_h2(conn_rec *);
static void http2_get_num_workers(server_rec *s, int *minw, int *maxw)
{
- h2_get_num_workers(s, minw, maxw);
+ apr_time_t tdummy;
+
+ h2_get_workers_config(s, minw, maxw, &tdummy);
}
/* Runs once per created child process. Perform any process