diff options
author | Stefan Eissing <icing@apache.org> | 2017-01-13 16:32:57 +0000 |
---|---|---|
committer | Stefan Eissing <icing@apache.org> | 2017-01-13 16:32:57 +0000 |
commit | f6e502d24058643248ac49ad84f0a4ece4a62045 (patch) | |
tree | 6eee69c103686027c937e650b2e1f2d79198bffb | |
parent | c836392fb64631458ed7491bf1b0c8b605b6054b (diff) | |
download | httpd-f6e502d24058643248ac49ad84f0a4ece4a62045.tar.gz |
On the 2.4.x branch: merge of r1701609-1705681 from trunk.
*) mod_http2: streaming of request output now reacts timely to data
from other streams becoming available. Same for new incoming requests.
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1778629 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | CHANGES | 10 | ||||
-rw-r--r-- | modules/http2/h2_bucket_beam.c | 80 | ||||
-rw-r--r-- | modules/http2/h2_bucket_beam.h | 42 | ||||
-rw-r--r-- | modules/http2/h2_config.c | 20 | ||||
-rw-r--r-- | modules/http2/h2_config.h | 2 | ||||
-rw-r--r-- | modules/http2/h2_conn_io.c | 73 | ||||
-rw-r--r-- | modules/http2/h2_conn_io.h | 9 | ||||
-rw-r--r-- | modules/http2/h2_mplx.c | 62 | ||||
-rw-r--r-- | modules/http2/h2_mplx.h | 7 | ||||
-rw-r--r-- | modules/http2/h2_session.c | 64 | ||||
-rw-r--r-- | modules/http2/h2_stream.c | 1 | ||||
-rw-r--r-- | modules/http2/h2_version.h | 4 |
12 files changed, 207 insertions, 167 deletions
@@ -2,9 +2,13 @@ Changes with Apache 2.4.26 - *) mod_proxy_fcgi, mod_fcgid: Fix crashes in ap_fcgi_encoded_env_len() when - modules add empty environment variables to the request. PR60275. - [<alex2grad AT gmail.com>] + *) mod_http2: streaming of request output now reacts timely to data + from other streams becoming available. Same for new incoming requests. + [Stefan Eissing] + + *) mod_proxy_fcgi, mod_fcgid: Fix crashes in ap_fcgi_encoded_env_len() when + modules add empty environment variables to the request. PR60275. + [<alex2grad AT gmail.com>] *) mod_http2: fix for possible page fault when stream is resumed during session shutdown. [sidney-j-r-m (github)] diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index add3065c97..39927fac4d 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -14,6 +14,7 @@ */ #include <apr_lib.h> +#include <apr_atomic.h> #include <apr_strings.h> #include <apr_time.h> #include <apr_buckets.h> @@ -243,25 +244,29 @@ static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl) } } -static void report_consumption(h2_bucket_beam *beam, int force) +static int report_consumption(h2_bucket_beam *beam) { - if (force || beam->received_bytes != beam->reported_consumed_bytes) { - if (beam->consumed_fn) { - beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes - - beam->reported_consumed_bytes); + int rv = 0; + if (apr_atomic_read32(&beam->cons_ev_pending)) { + if (beam->cons_io_cb) { + beam->cons_io_cb(beam->cons_ctx, beam, beam->received_bytes + - beam->cons_bytes_reported); + rv = 1; } - beam->reported_consumed_bytes = beam->received_bytes; + beam->cons_bytes_reported = beam->received_bytes; + apr_atomic_set32(&beam->cons_ev_pending, 0); } + return rv; } -static void report_production(h2_bucket_beam *beam, int force) +static void report_prod_io(h2_bucket_beam *beam, int force) { - if (force || beam->sent_bytes != beam->reported_produced_bytes) { - if (beam->produced_fn) { - beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes - - beam->reported_produced_bytes); + if (force || beam->prod_bytes_reported != beam->sent_bytes) { + if (beam->prod_io_cb) { + beam->prod_io_cb(beam->prod_ctx, beam, beam->sent_bytes + - beam->prod_bytes_reported); } - beam->reported_produced_bytes = beam->sent_bytes; + beam->prod_bytes_reported = beam->sent_bytes; } } @@ -322,7 +327,7 @@ static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block, while (!beam->aborted && *premain <= 0 && (block == APR_BLOCK_READ) && pbl->mutex) { apr_status_t status; - report_production(beam, 1); + report_prod_io(beam, 1); status = wait_cond(beam, pbl->mutex); if (APR_STATUS_IS_TIMEUP(status)) { return status; @@ -453,7 +458,7 @@ static apr_status_t beam_send_cleanup(void *data) /* sender has gone away, clear up all references to its memory */ r_purge_sent(beam); h2_blist_cleanup(&beam->send_list); - report_consumption(beam, 0); + report_consumption(beam); while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) { h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies); H2_BPROXY_REMOVE(proxy); @@ -634,7 +639,7 @@ void h2_beam_abort(h2_bucket_beam *beam) beam->aborted = 1; r_purge_sent(beam); h2_blist_cleanup(&beam->send_list); - report_consumption(beam, 0); + report_consumption(beam); } if (beam->m_cond) { apr_thread_cond_broadcast(beam->m_cond); @@ -650,7 +655,7 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam) if (enter_yellow(beam, &bl) == APR_SUCCESS) { r_purge_sent(beam); beam_close(beam); - report_consumption(beam, 0); + report_consumption(beam); leave_yellow(beam, &bl); } return beam->aborted? APR_ECONNABORTED : APR_SUCCESS; @@ -851,12 +856,12 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, b = APR_BRIGADE_FIRST(red_brigade); status = append_bucket(beam, b, block, &bl); } - report_production(beam, force_report); + report_prod_io(beam, force_report); if (beam->m_cond) { apr_thread_cond_broadcast(beam->m_cond); } } - report_consumption(beam, 0); + report_consumption(beam); leave_yellow(beam, &bl); } return status; @@ -872,6 +877,7 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam, int transferred = 0; apr_status_t status = APR_SUCCESS; apr_off_t remain = readbytes; + int transferred_buckets = 0; /* Called from the green thread to take buckets from the beam */ if (enter_yellow(beam, &bl) == APR_SUCCESS) { @@ -968,6 +974,8 @@ transfer: APR_BUCKET_REMOVE(bred); H2_BLIST_INSERT_TAIL(&beam->hold_list, bred); beam->received_bytes += bred->length; + ++transferred_buckets; + if (bgreen) { APR_BRIGADE_INSERT_TAIL(bb, bgreen); remain -= bgreen->length; @@ -1000,6 +1008,13 @@ transfer: } } + if (transferred_buckets > 0) { + apr_atomic_set32(&beam->cons_ev_pending, 1); + if (beam->cons_ev_cb) { + beam->cons_ev_cb(beam->cons_ctx, beam); + } + } + if (beam->closed && (!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer)) && H2_BLIST_EMPTY(&beam->send_list)) { @@ -1042,25 +1057,25 @@ leave: } void h2_beam_on_consumed(h2_bucket_beam *beam, - h2_beam_io_callback *cb, void *ctx) + h2_beam_ev_callback *ev_cb, + h2_beam_io_callback *io_cb, void *ctx) { h2_beam_lock bl; - if (enter_yellow(beam, &bl) == APR_SUCCESS) { - beam->consumed_fn = cb; - beam->consumed_ctx = ctx; + beam->cons_ev_cb = ev_cb; + beam->cons_io_cb = io_cb; + beam->cons_ctx = ctx; leave_yellow(beam, &bl); } } void h2_beam_on_produced(h2_bucket_beam *beam, - h2_beam_io_callback *cb, void *ctx) + h2_beam_io_callback *io_cb, void *ctx) { h2_beam_lock bl; - if (enter_yellow(beam, &bl) == APR_SUCCESS) { - beam->produced_fn = cb; - beam->produced_ctx = ctx; + beam->prod_io_cb = io_cb; + beam->prod_ctx = ctx; leave_yellow(beam, &bl); } } @@ -1173,3 +1188,16 @@ int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file) return 0; } +int h2_beam_report_consumption(h2_bucket_beam *beam) +{ + if (apr_atomic_read32(&beam->cons_ev_pending)) { + h2_beam_lock bl; + if (enter_yellow(beam, &bl) == APR_SUCCESS) { + int rv = report_consumption(beam); + leave_yellow(beam, &bl); + return rv; + } + } + return 0; +} + diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h index 39fd476c20..a4ead053ce 100644 --- a/modules/http2/h2_bucket_beam.h +++ b/modules/http2/h2_bucket_beam.h @@ -154,6 +154,7 @@ typedef apr_status_t h2_beam_mutex_enter(void *ctx, h2_beam_lock *pbl); typedef void h2_beam_io_callback(void *ctx, h2_bucket_beam *beam, apr_off_t bytes); +typedef void h2_beam_ev_callback(void *ctx, h2_bucket_beam *beam); typedef struct h2_beam_proxy h2_beam_proxy; typedef struct { @@ -205,12 +206,17 @@ struct h2_bucket_beam { h2_beam_mutex_enter *m_enter; struct apr_thread_cond_t *m_cond; - apr_off_t reported_consumed_bytes; /* amount of bytes reported as consumed */ - h2_beam_io_callback *consumed_fn; - void *consumed_ctx; - apr_off_t reported_produced_bytes; /* amount of bytes reported as produced */ - h2_beam_io_callback *produced_fn; - void *produced_ctx; + apr_uint32_t cons_ev_pending; /* != 0, consumer event pending */ + apr_off_t cons_bytes_reported; /* amount of bytes reported as consumed */ + h2_beam_ev_callback *cons_ev_cb; + h2_beam_io_callback *cons_io_cb; + void *cons_ctx; + + apr_uint32_t prod_ev_pending; /* != 0, producer event pending */ + apr_off_t prod_bytes_reported; /* amount of bytes reported as produced */ + h2_beam_io_callback *prod_io_cb; + void *prod_ctx; + h2_beam_can_beam_callback *can_beam_fn; void *can_beam_ctx; }; @@ -336,26 +342,38 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam); * amount of bytes that have been consumed by the receiver, since the * last callback invocation or reset. * @param beam the beam to set the callback on - * @param cb the callback or NULL + * @param ev_cb the callback or NULL, called when bytes are consumed + * @param io_cb the callback or NULL, called on sender with bytes consumed * @param ctx the context to use in callback invocation * - * Call from the sender side, callbacks invoked on sender side. + * Call from the sender side, io callbacks invoked on sender side, ev callback + * from any side. */ void h2_beam_on_consumed(h2_bucket_beam *beam, - h2_beam_io_callback *cb, void *ctx); + h2_beam_ev_callback *ev_cb, + h2_beam_io_callback *io_cb, void *ctx); + +/** + * Call any registered consumed handler, if any changes have happened + * since the last invocation. + * @return !=0 iff a handler has been called + * + * Needs to be invoked from the sending side. + */ +int h2_beam_report_consumption(h2_bucket_beam *beam); /** * Register a callback to be invoked on the receiver side with the * amount of bytes that have been produces by the sender, since the * last callback invocation or reset. * @param beam the beam to set the callback on - * @param cb the callback or NULL + * @param io_cb the callback or NULL, called on receiver with bytes produced * @param ctx the context to use in callback invocation * - * Call from the receiver side, callbacks invoked on receiver side. + * Call from the receiver side, callbacks invoked on either side. */ void h2_beam_on_produced(h2_bucket_beam *beam, - h2_beam_io_callback *cb, void *ctx); + h2_beam_io_callback *io_cb, void *ctx); void h2_beam_on_file_beam(h2_bucket_beam *beam, h2_beam_can_beam_callback *cb, void *ctx); diff --git a/modules/http2/h2_config.c b/modules/http2/h2_config.c index f91c8c257a..3ac969d768 100644 --- a/modules/http2/h2_config.c +++ b/modules/http2/h2_config.c @@ -64,7 +64,6 @@ static h2_config defconf = { 0, /* copy files across threads */ NULL, /* push list */ 0, /* early hints, http status 103 */ - 2, /* TLS records flush count */ }; void h2_config_init(apr_pool_t *pool) @@ -100,7 +99,6 @@ static void *h2_config_create(apr_pool_t *pool, conf->copy_files = DEF_VAL; conf->push_list = NULL; conf->early_hints = DEF_VAL; - conf->tls_flush_count = DEF_VAL; return conf; } @@ -153,7 +151,6 @@ static void *h2_config_merge(apr_pool_t *pool, void *basev, void *addv) n->push_list = add->push_list? add->push_list : base->push_list; } n->early_hints = H2_CONFIG_GET(add, base, early_hints); - n->tls_flush_count = H2_CONFIG_GET(add, base, tls_flush_count); return n; } @@ -211,8 +208,6 @@ apr_int64_t h2_config_geti64(const h2_config *conf, h2_config_var_t var) return H2_CONFIG_GET(conf, &defconf, copy_files); case H2_CONF_EARLY_HINTS: return H2_CONFIG_GET(conf, &defconf, early_hints); - case H2_CONF_TLS_FLUSH_COUNT: - return H2_CONFIG_GET(conf, &defconf, tls_flush_count); default: return DEF_VAL; } @@ -314,7 +309,7 @@ static const char *h2_conf_set_stream_max_mem_size(cmd_parms *parms, static const char *h2_add_alt_svc(cmd_parms *parms, void *arg, const char *value) { - if (value && strlen(value)) { + if (value && *value) { h2_config *cfg = (h2_config *)h2_config_sget(parms->server); h2_alt_svc *as = h2_alt_svc_parse(value, parms->pool); if (!as) { @@ -412,7 +407,7 @@ static const char *h2_conf_add_push_priority(cmd_parms *cmd, void *_cfg, h2_priority *priority; int weight; - if (!strlen(ctype)) { + if (!*ctype) { return "1st argument must be a mime-type, like 'text/css' or '*'"; } @@ -510,15 +505,6 @@ static const char *h2_conf_set_tls_cooldown_secs(cmd_parms *parms, return NULL; } -static const char *h2_conf_set_tls_flush_count(cmd_parms *parms, - void *arg, const char *value) -{ - h2_config *cfg = (h2_config *)h2_config_sget(parms->server); - cfg->tls_flush_count = (int)apr_atoi64(value); - (void)arg; - return NULL; -} - static const char *h2_conf_set_push_diary_size(cmd_parms *parms, void *arg, const char *value) { @@ -657,8 +643,6 @@ const command_rec h2_cmds[] = { RSRC_CONF, "number of bytes on TLS connection before doing max writes"), AP_INIT_TAKE1("H2TLSCoolDownSecs", h2_conf_set_tls_cooldown_secs, NULL, RSRC_CONF, "seconds of idle time on TLS before shrinking writes"), - AP_INIT_TAKE1("H2TLSFlushCount", h2_conf_set_tls_flush_count, NULL, - RSRC_CONF, "number of max TLS records before output is flushed"), AP_INIT_TAKE1("H2Push", h2_conf_set_push, NULL, RSRC_CONF, "off to disable HTTP/2 server push"), AP_INIT_TAKE23("H2PushPriority", h2_conf_add_push_priority, NULL, diff --git a/modules/http2/h2_config.h b/modules/http2/h2_config.h index 60257df427..1f2fe309d0 100644 --- a/modules/http2/h2_config.h +++ b/modules/http2/h2_config.h @@ -42,7 +42,6 @@ typedef enum { H2_CONF_PUSH_DIARY_SIZE, H2_CONF_COPY_FILES, H2_CONF_EARLY_HINTS, - H2_CONF_TLS_FLUSH_COUNT, } h2_config_var_t; struct apr_hash_t; @@ -80,7 +79,6 @@ typedef struct h2_config { int copy_files; /* if files shall be copied vs setaside on output */ apr_array_header_t *push_list;/* list of h2_push_res configurations */ int early_hints; /* support status code 103 */ - int tls_flush_count; /* max # of TLS records until output flushed */ } h2_config; diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c index f3126bb71f..2998c8b338 100644 --- a/modules/http2/h2_conn_io.c +++ b/modules/http2/h2_conn_io.c @@ -132,9 +132,7 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, io->output = apr_brigade_create(c->pool, c->bucket_alloc); io->is_tls = h2_h2_is_tls(c); io->buffer_output = io->is_tls; - io->pass_threshold = (apr_size_t)h2_config_geti64(cfg, H2_CONF_STREAM_MAX_MEM); - io->flush_factor = h2_config_geti(cfg, H2_CONF_TLS_FLUSH_COUNT); - io->speed_factor = 1.0; + io->flush_threshold = (apr_size_t)h2_config_geti64(cfg, H2_CONF_STREAM_MAX_MEM); if (io->is_tls) { /* This is what we start with, @@ -257,7 +255,6 @@ static void check_write_size(h2_conn_io *io) /* long time not written, reset write size */ io->write_size = WRITE_SIZE_INITIAL; io->bytes_written = 0; - io->speed_factor = 1.0; ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io(%ld): timeout write size reset to %ld", (long)io->c->id, (long)io->write_size); @@ -282,7 +279,7 @@ static apr_status_t pass_output(h2_conn_io *io, int flush, apr_status_t status; append_scratch(io); - if (flush) { + if (flush && !io->is_flushed) { b = apr_bucket_flush_create(c->bucket_alloc); APR_BRIGADE_INSERT_TAIL(bb, b); } @@ -300,6 +297,9 @@ static apr_status_t pass_output(h2_conn_io *io, int flush, if (status == APR_SUCCESS) { io->bytes_written += (apr_size_t)bblen; io->last_write = apr_time_now(); + if (flush) { + io->is_flushed = 1; + } } apr_brigade_cleanup(bb); @@ -325,35 +325,26 @@ static apr_status_t pass_output(h2_conn_io *io, int flush, return status; } +int h2_conn_io_needs_flush(h2_conn_io *io) +{ + if (!io->is_flushed) { + apr_off_t len = h2_brigade_mem_size(io->output); + if (len > io->flush_threshold) { + return 1; + } + /* if we do not exceed flush length due to memory limits, + * we want at least flush when we have that amount of data. */ + apr_brigade_length(io->output, 0, &len); + return len > (4 * io->flush_threshold); + } + return 0; +} + apr_status_t h2_conn_io_flush(h2_conn_io *io) { - apr_time_t start = 0; apr_status_t status; - - if (io->needs_flush > 0) { - /* this is a buffer size triggered flush, let's measure how - * long it takes and try to adjust our speed factor accordingly */ - start = apr_time_now(); - } status = pass_output(io, 1, NULL); check_write_size(io); - if (start && status == APR_SUCCESS) { - apr_time_t duration = apr_time_now() - start; - if (duration < apr_time_from_msec(100)) { - io->speed_factor *= 1.0 + ((apr_time_from_msec(100) - duration) - / (float)apr_time_from_msec(100)); - ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, io->c, - "h2_conn_io(%ld): incr speed_factor to %f", - io->c->id, io->speed_factor); - } - else if (duration > apr_time_from_msec(200)) { - io->speed_factor *= 0.5; - ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, io->c, - "h2_conn_io(%ld): decr speed_factor to %f", - io->c->id, io->speed_factor); - } - } - io->needs_flush = -1; return status; } @@ -367,6 +358,10 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, const char *data, size_t length) apr_status_t status = APR_SUCCESS; apr_size_t remain; + if (length > 0) { + io->is_flushed = 0; + } + if (io->buffer_output) { while (length > 0) { remain = assure_scratch_space(io); @@ -396,7 +391,6 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, const char *data, size_t length) else { status = apr_brigade_write(io->output, NULL, NULL, data, length); } - io->needs_flush = -1; return status; } @@ -405,6 +399,10 @@ apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb) apr_bucket *b; apr_status_t status = APR_SUCCESS; + if (!APR_BRIGADE_EMPTY(bb)) { + io->is_flushed = 0; + } + while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) { b = APR_BRIGADE_FIRST(bb); @@ -447,21 +445,6 @@ apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb) APR_BRIGADE_INSERT_TAIL(io->output, b); } } - io->needs_flush = -1; return status; } -int h2_conn_io_needs_flush(h2_conn_io *io) -{ - if (io->needs_flush < 0) { - apr_off_t len; - apr_brigade_length(io->output, 0, &len); - if (len > (io->pass_threshold * io->flush_factor * io->speed_factor)) { - /* don't want to keep too much around */ - io->needs_flush = 1; - return 1; - } - io->needs_flush = 0; - } - return 0; -} diff --git a/modules/http2/h2_conn_io.h b/modules/http2/h2_conn_io.h index f617018565..0cbe479d1d 100644 --- a/modules/http2/h2_conn_io.h +++ b/modules/http2/h2_conn_io.h @@ -39,10 +39,8 @@ typedef struct { apr_int64_t bytes_written; int buffer_output; - int needs_flush; - apr_size_t pass_threshold; - float flush_factor; - float speed_factor; + apr_size_t flush_threshold; + unsigned int is_flushed : 1; char *scratch; apr_size_t ssize; @@ -76,6 +74,9 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, struct h2_session *session); */ apr_status_t h2_conn_io_flush(h2_conn_io *io); +/** + * Check if the buffered amount of data needs flushing. + */ int h2_conn_io_needs_flush(h2_conn_io *io); #endif /* defined(__mod_h2__h2_conn_io__) */ diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 5823bd8e54..b535a02a13 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -17,6 +17,7 @@ #include <stddef.h> #include <stdlib.h> +#include <apr_atomic.h> #include <apr_thread_mutex.h> #include <apr_thread_cond.h> #include <apr_strings.h> @@ -143,6 +144,12 @@ static void stream_output_consumed(void *ctx, } } +static void stream_input_ev(void *ctx, h2_bucket_beam *beam) +{ + h2_mplx *m = ctx; + apr_atomic_set32(&m->event_pending, 1); +} + static void stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) { @@ -337,18 +344,18 @@ int h2_mplx_shutdown(h2_mplx *m) return max_stream_started; } -static void input_consumed_signal(h2_mplx *m, h2_stream *stream) +static int input_consumed_signal(h2_mplx *m, h2_stream *stream) { - if (stream->input && stream->started) { - h2_beam_send(stream->input, NULL, 0); /* trigger updates */ + if (stream->input) { + return h2_beam_report_consumption(stream->input); } + return 0; } static int output_consumed_signal(h2_mplx *m, h2_task *task) { - if (task->output.beam && task->worker_started && task->assigned) { - /* trigger updates */ - h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ); + if (task->output.beam) { + return h2_beam_report_consumption(task->output.beam); } return 0; } @@ -438,7 +445,7 @@ static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) h2_stream_cleanup(stream); m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input); - h2_beam_on_consumed(stream->input, NULL, NULL); + h2_beam_on_consumed(stream->input, NULL, NULL, NULL); /* Let anyone blocked reading know that there is no more to come */ h2_beam_abort(stream->input); /* Remove mutex after, so that abort still finds cond to signal */ @@ -711,7 +718,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) "h2_mplx(%s): out open", task->id); } - h2_beam_on_consumed(stream->output, stream_output_consumed, task); + h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, task); h2_beam_on_produced(stream->output, output_produced, m); beamed_count = h2_beam_get_files_beamed(stream->output); if (m->tx_handles_reserved >= beamed_count) { @@ -785,7 +792,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, if (m->aborted) { status = APR_ECONNABORTED; } - else if (!h2_iq_empty(m->readyq)) { + else if (apr_atomic_read32(&m->event_pending) > 0) { status = APR_SUCCESS; } else { @@ -809,6 +816,7 @@ static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response) ap_assert(m); ap_assert(stream); h2_iq_append(m->readyq, stream->id); + apr_atomic_set32(&m->event_pending, 1); if (m->added_output) { apr_thread_cond_signal(m->added_output); } @@ -847,6 +855,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, else { h2_ihash_add(m->streams, stream); if (h2_stream_is_ready(stream)) { + apr_atomic_set32(&m->event_pending, 1); h2_iq_append(m->readyq, stream->id); } else { @@ -912,7 +921,8 @@ static h2_task *next_stream_task(h2_mplx *m) } h2_beam_timeout_set(stream->input, m->stream_timeout); - h2_beam_on_consumed(stream->input, stream_input_consumed, m); + h2_beam_on_consumed(stream->input, stream_input_ev, + stream_input_consumed, m); h2_beam_on_file_beam(stream->input, can_beam_file, m); h2_beam_mutex_set(stream->input, beam_enter, task->cond, m); @@ -1027,7 +1037,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) task->id); /* more data will not arrive, resume the stream */ have_out_data_for(m, stream, 0); - h2_beam_on_consumed(stream->output, NULL, NULL); + h2_beam_on_consumed(stream->output, NULL, NULL, NULL); h2_beam_mutex_set(stream->output, NULL, NULL, NULL); } else { @@ -1041,7 +1051,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) * called from a worker thread and freeing memory pools * is only safe in the only thread using it (and its * parent pool / allocator) */ - h2_beam_on_consumed(stream->output, NULL, NULL); + h2_beam_on_consumed(stream->output, NULL, NULL, NULL); h2_beam_mutex_set(stream->output, NULL, NULL, NULL); h2_ihash_remove(m->shold, stream->id); h2_ihash_add(m->spurge, stream); @@ -1351,7 +1361,12 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn, * mplx master events dispatching ******************************************************************************/ -static int update_window(void *ctx, void *val) +int h2_mplx_has_master_events(h2_mplx *m) +{ + return apr_atomic_read32(&m->event_pending) > 0; +} + +static int report_consumption_iter(void *ctx, void *val) { input_consumed_signal(ctx, val); return 1; @@ -1367,25 +1382,29 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, h2_stream *stream; size_t i, n; + if (!h2_mplx_has_master_events(m)) { + return APR_EAGAIN; + } + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, - "h2_mplx(%ld): dispatch events", m->id); - + "h2_mplx(%ld): dispatch events", m->id); + apr_atomic_set32(&m->event_pending, 0); /* update input windows for streams */ - h2_ihash_iter(m->streams, update_window, m); - if (on_resume && !h2_iq_empty(m->readyq)) { + h2_ihash_iter(m->streams, report_consumption_iter, m); + + if (!h2_iq_empty(m->readyq)) { n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids)); for (i = 0; i < n; ++i) { stream = h2_ihash_get(m->streams, ids[i]); if (stream) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, - "h2_mplx(%ld-%d): on_resume", - m->id, stream->id); on_resume(on_ctx, stream); } } } - + if (!h2_iq_empty(m->readyq)) { + apr_atomic_set32(&m->event_pending, 1); + } leave_mutex(m, acquired); } return status; @@ -1400,6 +1419,7 @@ apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id) h2_stream *s = h2_ihash_get(m->streams, stream_id); if (s) { h2_iq_append(m->readyq, stream_id); + apr_atomic_set32(&m->event_pending, 1); } leave_mutex(m, acquired); } diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 25e07005e7..a82918fbbc 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -67,6 +67,7 @@ struct h2_mplx { APR_RING_ENTRY(h2_mplx) link; + unsigned int event_pending; unsigned int aborted : 1; unsigned int need_registration : 1; @@ -222,6 +223,12 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx); typedef apr_status_t stream_ev_callback(void *ctx, struct h2_stream *stream); /** + * Check if the multiplexer has events for the master connection pending. + * @return != 0 iff there are events pending + */ +int h2_mplx_has_master_events(h2_mplx *m); + +/** * Dispatch events for the master connection, such as ± @param m the multiplexer * @param on_resume new output data has arrived for a suspended stream diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index ad313f3bdd..93cfe33718 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -48,6 +48,7 @@ static apr_status_t dispatch_master(h2_session *session); +static apr_status_t h2_session_read(h2_session *session, int block); static int h2_session_status_from_apr_status(apr_status_t rv) { @@ -240,17 +241,6 @@ static ssize_t send_cb(nghttp2_session *ngh2, (void)ngh2; (void)flags; - if (h2_conn_io_needs_flush(&session->io)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_session(%ld): blocking due to io flush", - session->id); - status = h2_conn_io_flush(&session->io); - if (status != APR_SUCCESS) { - return h2_session_status_from_apr_status(status); - } - return NGHTTP2_ERR_WOULDBLOCK; - } - status = h2_conn_io_write(&session->io, (const char *)data, length); if (status == APR_SUCCESS) { return length; @@ -569,6 +559,16 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, return 0; } +static int h2_session_continue_data(h2_session *session) { + if (h2_mplx_has_master_events(session->mplx)) { + return 0; + } + if (h2_conn_io_needs_flush(&session->io)) { + return 0; + } + return 1; +} + static char immortal_zeros[H2_MAX_PADLEN]; static int on_send_data_cb(nghttp2_session *ngh2, @@ -589,17 +589,10 @@ static int on_send_data_cb(nghttp2_session *ngh2, (void)ngh2; (void)source; - if (h2_conn_io_needs_flush(&session->io)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_stream(%ld-%d): blocking due to io flush", - session->id, stream_id); - status = h2_conn_io_flush(&session->io); - if (status != APR_SUCCESS) { - return h2_session_status_from_apr_status(status); - } + if (!h2_session_continue_data(session)) { return NGHTTP2_ERR_WOULDBLOCK; } - + if (frame->data.padlen > H2_MAX_PADLEN) { return NGHTTP2_ERR_PROTO; } @@ -1418,8 +1411,6 @@ static apr_status_t h2_session_send(h2_session *session) apr_socket_timeout_set(socket, session->s->timeout); } - /* This sends one round of frames from every able stream, plus - * settings etc. if accumulated */ rv = nghttp2_session_send(session->ngh2); if (socket) { @@ -2058,7 +2049,11 @@ static apr_status_t dispatch_master(h2_session *session) { status = h2_mplx_dispatch_master_events(session->mplx, on_stream_resume, session); - if (status != APR_SUCCESS) { + if (status == APR_EAGAIN) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c, + "h2_session(%ld): no master event available", session->id); + } + else if (status != APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c, "h2_session(%ld): dispatch error", session->id); dispatch_event(session, H2_SESSION_EV_CONN_ERROR, @@ -2118,6 +2113,14 @@ apr_status_t h2_session_process(h2_session *session, int async) case H2_SESSION_ST_IDLE: /* make certain, we send everything before we idle */ h2_conn_io_flush(&session->io); + /* We trust our connection into the default timeout/keepalive + * handling of the core filters/mpm iff: + * - keep_sync_until is not set + * - we have an async mpm + * - we have no open streams to process + * - we are not sitting on a Upgrade: request + * - we already have seen at least one request + */ if (!session->keep_sync_until && async && !session->open_streams && !session->r && session->remote.emitted_count) { if (trace) { @@ -2126,15 +2129,6 @@ apr_status_t h2_session_process(h2_session *session, int async) "%d streams open", session->id, session->open_streams); } - /* We do not return to the async mpm immediately, since under - * load, mpms show the tendency to throw keep_alive connections - * away very rapidly. - * So, if we are still processing streams, we wait for the - * normal timeout first and, on timeout, close. - * If we have no streams, we still wait a short amount of - * time here for the next frame to arrive, before handing - * it to keep_alive processing of the mpm. - */ status = h2_session_read(session, 0); if (status == APR_SUCCESS) { @@ -2219,7 +2213,6 @@ apr_status_t h2_session_process(h2_session *session, int async) dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "error"); } } - break; case H2_SESSION_ST_BUSY: @@ -2244,13 +2237,16 @@ apr_status_t h2_session_process(h2_session *session, int async) } status = dispatch_master(session); - if (status != APR_SUCCESS) { + if (status != APR_SUCCESS && status != APR_EAGAIN) { break; } if (nghttp2_session_want_write(session->ngh2)) { ap_update_child_status(session->c->sbh, SERVER_BUSY_WRITE, NULL); status = h2_session_send(session); + if (status == APR_SUCCESS) { + status = h2_conn_io_flush(&session->io); + } if (status != APR_SUCCESS) { dispatch_event(session, H2_SESSION_EV_CONN_ERROR, H2_ERR_INTERNAL_ERROR, "writing"); diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 7cec844b22..7f919ab6e2 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -481,6 +481,7 @@ apr_status_t h2_stream_close_input(h2_stream *stream) APR_BRIGADE_INSERT_TAIL(tmp, b); status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); apr_brigade_destroy(tmp); + h2_beam_close(stream->input); return status; } diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index 3a5e64e693..7a6793ff0f 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.8.7" +#define MOD_HTTP2_VERSION "1.8.8" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010807 +#define MOD_HTTP2_VERSION_NUM 0x010808 #endif /* mod_h2_h2_version_h */ |