summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Eissing <icing@apache.org>2017-01-08 20:11:27 +0000
committerStefan Eissing <icing@apache.org>2017-01-08 20:11:27 +0000
commit728b6cde480fa6b04cfbea0959ca0ac8bfc37ed3 (patch)
tree5cd626c92b22cf35a823eba793e09d59dd888343
parent564a90226180cbfdd3e2cb336be1d04998cd7543 (diff)
downloadhttpd-728b6cde480fa6b04cfbea0959ca0ac8bfc37ed3.tar.gz
On the trunk:
*) mod_http2: streaming of request output now reacts timely to data from other streams becoming available. Same for new incoming requests. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1777907 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--CHANGES10
-rw-r--r--modules/http2/h2_bucket_beam.c80
-rw-r--r--modules/http2/h2_bucket_beam.h42
-rw-r--r--modules/http2/h2_config.c16
-rw-r--r--modules/http2/h2_config.h2
-rw-r--r--modules/http2/h2_conn_io.c73
-rw-r--r--modules/http2/h2_conn_io.h9
-rw-r--r--modules/http2/h2_mplx.c62
-rw-r--r--modules/http2/h2_mplx.h7
-rw-r--r--modules/http2/h2_session.c64
-rw-r--r--modules/http2/h2_stream.c1
-rw-r--r--modules/http2/h2_version.h4
12 files changed, 202 insertions, 168 deletions
diff --git a/CHANGES b/CHANGES
index 97ee634185..1dfa156b01 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,15 +1,13 @@
-*- coding: utf-8 -*-
Changes with Apache 2.5.0
+ *) mod_http2: streaming of request output now reacts timely to data
+ from other streams becoming available. Same for new incoming requests.
+ [Stefan Eissing]
+
*) core: EBCDIC fixes for interim responses with additional headers.
[Eric Covener]
- *) mod_http2: fix for possible page fault when stream is resumed during
- session shutdown. [sidney-j-r-m (github)]
-
- *) mod_http2: fix for h2 session ignoring new responses while already
- open streams continue to have data available. [Stefan Eissing]
-
*) mod_remoteip: Add support for PROXY protocol (code donated by Cloudzilla).
Add ability for PROXY protocol processing to be optional to donated code.
See also: http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt
diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c
index add3065c97..39927fac4d 100644
--- a/modules/http2/h2_bucket_beam.c
+++ b/modules/http2/h2_bucket_beam.c
@@ -14,6 +14,7 @@
*/
#include <apr_lib.h>
+#include <apr_atomic.h>
#include <apr_strings.h>
#include <apr_time.h>
#include <apr_buckets.h>
@@ -243,25 +244,29 @@ static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
}
}
-static void report_consumption(h2_bucket_beam *beam, int force)
+static int report_consumption(h2_bucket_beam *beam)
{
- if (force || beam->received_bytes != beam->reported_consumed_bytes) {
- if (beam->consumed_fn) {
- beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes
- - beam->reported_consumed_bytes);
+ int rv = 0;
+ if (apr_atomic_read32(&beam->cons_ev_pending)) {
+ if (beam->cons_io_cb) {
+ beam->cons_io_cb(beam->cons_ctx, beam, beam->received_bytes
+ - beam->cons_bytes_reported);
+ rv = 1;
}
- beam->reported_consumed_bytes = beam->received_bytes;
+ beam->cons_bytes_reported = beam->received_bytes;
+ apr_atomic_set32(&beam->cons_ev_pending, 0);
}
+ return rv;
}
-static void report_production(h2_bucket_beam *beam, int force)
+static void report_prod_io(h2_bucket_beam *beam, int force)
{
- if (force || beam->sent_bytes != beam->reported_produced_bytes) {
- if (beam->produced_fn) {
- beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes
- - beam->reported_produced_bytes);
+ if (force || beam->prod_bytes_reported != beam->sent_bytes) {
+ if (beam->prod_io_cb) {
+ beam->prod_io_cb(beam->prod_ctx, beam, beam->sent_bytes
+ - beam->prod_bytes_reported);
}
- beam->reported_produced_bytes = beam->sent_bytes;
+ beam->prod_bytes_reported = beam->sent_bytes;
}
}
@@ -322,7 +327,7 @@ static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
while (!beam->aborted && *premain <= 0
&& (block == APR_BLOCK_READ) && pbl->mutex) {
apr_status_t status;
- report_production(beam, 1);
+ report_prod_io(beam, 1);
status = wait_cond(beam, pbl->mutex);
if (APR_STATUS_IS_TIMEUP(status)) {
return status;
@@ -453,7 +458,7 @@ static apr_status_t beam_send_cleanup(void *data)
/* sender has gone away, clear up all references to its memory */
r_purge_sent(beam);
h2_blist_cleanup(&beam->send_list);
- report_consumption(beam, 0);
+ report_consumption(beam);
while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
H2_BPROXY_REMOVE(proxy);
@@ -634,7 +639,7 @@ void h2_beam_abort(h2_bucket_beam *beam)
beam->aborted = 1;
r_purge_sent(beam);
h2_blist_cleanup(&beam->send_list);
- report_consumption(beam, 0);
+ report_consumption(beam);
}
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
@@ -650,7 +655,7 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam)
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
r_purge_sent(beam);
beam_close(beam);
- report_consumption(beam, 0);
+ report_consumption(beam);
leave_yellow(beam, &bl);
}
return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
@@ -851,12 +856,12 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam,
b = APR_BRIGADE_FIRST(red_brigade);
status = append_bucket(beam, b, block, &bl);
}
- report_production(beam, force_report);
+ report_prod_io(beam, force_report);
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
}
}
- report_consumption(beam, 0);
+ report_consumption(beam);
leave_yellow(beam, &bl);
}
return status;
@@ -872,6 +877,7 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam,
int transferred = 0;
apr_status_t status = APR_SUCCESS;
apr_off_t remain = readbytes;
+ int transferred_buckets = 0;
/* Called from the green thread to take buckets from the beam */
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
@@ -968,6 +974,8 @@ transfer:
APR_BUCKET_REMOVE(bred);
H2_BLIST_INSERT_TAIL(&beam->hold_list, bred);
beam->received_bytes += bred->length;
+ ++transferred_buckets;
+
if (bgreen) {
APR_BRIGADE_INSERT_TAIL(bb, bgreen);
remain -= bgreen->length;
@@ -1000,6 +1008,13 @@ transfer:
}
}
+ if (transferred_buckets > 0) {
+ apr_atomic_set32(&beam->cons_ev_pending, 1);
+ if (beam->cons_ev_cb) {
+ beam->cons_ev_cb(beam->cons_ctx, beam);
+ }
+ }
+
if (beam->closed
&& (!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer))
&& H2_BLIST_EMPTY(&beam->send_list)) {
@@ -1042,25 +1057,25 @@ leave:
}
void h2_beam_on_consumed(h2_bucket_beam *beam,
- h2_beam_io_callback *cb, void *ctx)
+ h2_beam_ev_callback *ev_cb,
+ h2_beam_io_callback *io_cb, void *ctx)
{
h2_beam_lock bl;
-
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- beam->consumed_fn = cb;
- beam->consumed_ctx = ctx;
+ beam->cons_ev_cb = ev_cb;
+ beam->cons_io_cb = io_cb;
+ beam->cons_ctx = ctx;
leave_yellow(beam, &bl);
}
}
void h2_beam_on_produced(h2_bucket_beam *beam,
- h2_beam_io_callback *cb, void *ctx)
+ h2_beam_io_callback *io_cb, void *ctx)
{
h2_beam_lock bl;
-
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- beam->produced_fn = cb;
- beam->produced_ctx = ctx;
+ beam->prod_io_cb = io_cb;
+ beam->prod_ctx = ctx;
leave_yellow(beam, &bl);
}
}
@@ -1173,3 +1188,16 @@ int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
return 0;
}
+int h2_beam_report_consumption(h2_bucket_beam *beam)
+{
+ if (apr_atomic_read32(&beam->cons_ev_pending)) {
+ h2_beam_lock bl;
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+ int rv = report_consumption(beam);
+ leave_yellow(beam, &bl);
+ return rv;
+ }
+ }
+ return 0;
+}
+
diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h
index 39fd476c20..a4ead053ce 100644
--- a/modules/http2/h2_bucket_beam.h
+++ b/modules/http2/h2_bucket_beam.h
@@ -154,6 +154,7 @@ typedef apr_status_t h2_beam_mutex_enter(void *ctx, h2_beam_lock *pbl);
typedef void h2_beam_io_callback(void *ctx, h2_bucket_beam *beam,
apr_off_t bytes);
+typedef void h2_beam_ev_callback(void *ctx, h2_bucket_beam *beam);
typedef struct h2_beam_proxy h2_beam_proxy;
typedef struct {
@@ -205,12 +206,17 @@ struct h2_bucket_beam {
h2_beam_mutex_enter *m_enter;
struct apr_thread_cond_t *m_cond;
- apr_off_t reported_consumed_bytes; /* amount of bytes reported as consumed */
- h2_beam_io_callback *consumed_fn;
- void *consumed_ctx;
- apr_off_t reported_produced_bytes; /* amount of bytes reported as produced */
- h2_beam_io_callback *produced_fn;
- void *produced_ctx;
+ apr_uint32_t cons_ev_pending; /* != 0, consumer event pending */
+ apr_off_t cons_bytes_reported; /* amount of bytes reported as consumed */
+ h2_beam_ev_callback *cons_ev_cb;
+ h2_beam_io_callback *cons_io_cb;
+ void *cons_ctx;
+
+ apr_uint32_t prod_ev_pending; /* != 0, producer event pending */
+ apr_off_t prod_bytes_reported; /* amount of bytes reported as produced */
+ h2_beam_io_callback *prod_io_cb;
+ void *prod_ctx;
+
h2_beam_can_beam_callback *can_beam_fn;
void *can_beam_ctx;
};
@@ -336,26 +342,38 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam);
* amount of bytes that have been consumed by the receiver, since the
* last callback invocation or reset.
* @param beam the beam to set the callback on
- * @param cb the callback or NULL
+ * @param ev_cb the callback or NULL, called when bytes are consumed
+ * @param io_cb the callback or NULL, called on sender with bytes consumed
* @param ctx the context to use in callback invocation
*
- * Call from the sender side, callbacks invoked on sender side.
+ * Call from the sender side, io callbacks invoked on sender side, ev callback
+ * from any side.
*/
void h2_beam_on_consumed(h2_bucket_beam *beam,
- h2_beam_io_callback *cb, void *ctx);
+ h2_beam_ev_callback *ev_cb,
+ h2_beam_io_callback *io_cb, void *ctx);
+
+/**
+ * Call any registered consumed handler, if any changes have happened
+ * since the last invocation.
+ * @return !=0 iff a handler has been called
+ *
+ * Needs to be invoked from the sending side.
+ */
+int h2_beam_report_consumption(h2_bucket_beam *beam);
/**
* Register a callback to be invoked on the receiver side with the
* amount of bytes that have been produces by the sender, since the
* last callback invocation or reset.
* @param beam the beam to set the callback on
- * @param cb the callback or NULL
+ * @param io_cb the callback or NULL, called on receiver with bytes produced
* @param ctx the context to use in callback invocation
*
- * Call from the receiver side, callbacks invoked on receiver side.
+ * Call from the receiver side, callbacks invoked on either side.
*/
void h2_beam_on_produced(h2_bucket_beam *beam,
- h2_beam_io_callback *cb, void *ctx);
+ h2_beam_io_callback *io_cb, void *ctx);
void h2_beam_on_file_beam(h2_bucket_beam *beam,
h2_beam_can_beam_callback *cb, void *ctx);
diff --git a/modules/http2/h2_config.c b/modules/http2/h2_config.c
index f91c8c257a..669d20e75c 100644
--- a/modules/http2/h2_config.c
+++ b/modules/http2/h2_config.c
@@ -64,7 +64,6 @@ static h2_config defconf = {
0, /* copy files across threads */
NULL, /* push list */
0, /* early hints, http status 103 */
- 2, /* TLS records flush count */
};
void h2_config_init(apr_pool_t *pool)
@@ -100,7 +99,6 @@ static void *h2_config_create(apr_pool_t *pool,
conf->copy_files = DEF_VAL;
conf->push_list = NULL;
conf->early_hints = DEF_VAL;
- conf->tls_flush_count = DEF_VAL;
return conf;
}
@@ -153,7 +151,6 @@ static void *h2_config_merge(apr_pool_t *pool, void *basev, void *addv)
n->push_list = add->push_list? add->push_list : base->push_list;
}
n->early_hints = H2_CONFIG_GET(add, base, early_hints);
- n->tls_flush_count = H2_CONFIG_GET(add, base, tls_flush_count);
return n;
}
@@ -211,8 +208,6 @@ apr_int64_t h2_config_geti64(const h2_config *conf, h2_config_var_t var)
return H2_CONFIG_GET(conf, &defconf, copy_files);
case H2_CONF_EARLY_HINTS:
return H2_CONFIG_GET(conf, &defconf, early_hints);
- case H2_CONF_TLS_FLUSH_COUNT:
- return H2_CONFIG_GET(conf, &defconf, tls_flush_count);
default:
return DEF_VAL;
}
@@ -510,15 +505,6 @@ static const char *h2_conf_set_tls_cooldown_secs(cmd_parms *parms,
return NULL;
}
-static const char *h2_conf_set_tls_flush_count(cmd_parms *parms,
- void *arg, const char *value)
-{
- h2_config *cfg = (h2_config *)h2_config_sget(parms->server);
- cfg->tls_flush_count = (int)apr_atoi64(value);
- (void)arg;
- return NULL;
-}
-
static const char *h2_conf_set_push_diary_size(cmd_parms *parms,
void *arg, const char *value)
{
@@ -657,8 +643,6 @@ const command_rec h2_cmds[] = {
RSRC_CONF, "number of bytes on TLS connection before doing max writes"),
AP_INIT_TAKE1("H2TLSCoolDownSecs", h2_conf_set_tls_cooldown_secs, NULL,
RSRC_CONF, "seconds of idle time on TLS before shrinking writes"),
- AP_INIT_TAKE1("H2TLSFlushCount", h2_conf_set_tls_flush_count, NULL,
- RSRC_CONF, "number of max TLS records before output is flushed"),
AP_INIT_TAKE1("H2Push", h2_conf_set_push, NULL,
RSRC_CONF, "off to disable HTTP/2 server push"),
AP_INIT_TAKE23("H2PushPriority", h2_conf_add_push_priority, NULL,
diff --git a/modules/http2/h2_config.h b/modules/http2/h2_config.h
index 60257df427..1f2fe309d0 100644
--- a/modules/http2/h2_config.h
+++ b/modules/http2/h2_config.h
@@ -42,7 +42,6 @@ typedef enum {
H2_CONF_PUSH_DIARY_SIZE,
H2_CONF_COPY_FILES,
H2_CONF_EARLY_HINTS,
- H2_CONF_TLS_FLUSH_COUNT,
} h2_config_var_t;
struct apr_hash_t;
@@ -80,7 +79,6 @@ typedef struct h2_config {
int copy_files; /* if files shall be copied vs setaside on output */
apr_array_header_t *push_list;/* list of h2_push_res configurations */
int early_hints; /* support status code 103 */
- int tls_flush_count; /* max # of TLS records until output flushed */
} h2_config;
diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c
index f3126bb71f..2998c8b338 100644
--- a/modules/http2/h2_conn_io.c
+++ b/modules/http2/h2_conn_io.c
@@ -132,9 +132,7 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
io->output = apr_brigade_create(c->pool, c->bucket_alloc);
io->is_tls = h2_h2_is_tls(c);
io->buffer_output = io->is_tls;
- io->pass_threshold = (apr_size_t)h2_config_geti64(cfg, H2_CONF_STREAM_MAX_MEM);
- io->flush_factor = h2_config_geti(cfg, H2_CONF_TLS_FLUSH_COUNT);
- io->speed_factor = 1.0;
+ io->flush_threshold = (apr_size_t)h2_config_geti64(cfg, H2_CONF_STREAM_MAX_MEM);
if (io->is_tls) {
/* This is what we start with,
@@ -257,7 +255,6 @@ static void check_write_size(h2_conn_io *io)
/* long time not written, reset write size */
io->write_size = WRITE_SIZE_INITIAL;
io->bytes_written = 0;
- io->speed_factor = 1.0;
ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
"h2_conn_io(%ld): timeout write size reset to %ld",
(long)io->c->id, (long)io->write_size);
@@ -282,7 +279,7 @@ static apr_status_t pass_output(h2_conn_io *io, int flush,
apr_status_t status;
append_scratch(io);
- if (flush) {
+ if (flush && !io->is_flushed) {
b = apr_bucket_flush_create(c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(bb, b);
}
@@ -300,6 +297,9 @@ static apr_status_t pass_output(h2_conn_io *io, int flush,
if (status == APR_SUCCESS) {
io->bytes_written += (apr_size_t)bblen;
io->last_write = apr_time_now();
+ if (flush) {
+ io->is_flushed = 1;
+ }
}
apr_brigade_cleanup(bb);
@@ -325,35 +325,26 @@ static apr_status_t pass_output(h2_conn_io *io, int flush,
return status;
}
+int h2_conn_io_needs_flush(h2_conn_io *io)
+{
+ if (!io->is_flushed) {
+ apr_off_t len = h2_brigade_mem_size(io->output);
+ if (len > io->flush_threshold) {
+ return 1;
+ }
+ /* if we do not exceed flush length due to memory limits,
+ * we want at least flush when we have that amount of data. */
+ apr_brigade_length(io->output, 0, &len);
+ return len > (4 * io->flush_threshold);
+ }
+ return 0;
+}
+
apr_status_t h2_conn_io_flush(h2_conn_io *io)
{
- apr_time_t start = 0;
apr_status_t status;
-
- if (io->needs_flush > 0) {
- /* this is a buffer size triggered flush, let's measure how
- * long it takes and try to adjust our speed factor accordingly */
- start = apr_time_now();
- }
status = pass_output(io, 1, NULL);
check_write_size(io);
- if (start && status == APR_SUCCESS) {
- apr_time_t duration = apr_time_now() - start;
- if (duration < apr_time_from_msec(100)) {
- io->speed_factor *= 1.0 + ((apr_time_from_msec(100) - duration)
- / (float)apr_time_from_msec(100));
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, io->c,
- "h2_conn_io(%ld): incr speed_factor to %f",
- io->c->id, io->speed_factor);
- }
- else if (duration > apr_time_from_msec(200)) {
- io->speed_factor *= 0.5;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, io->c,
- "h2_conn_io(%ld): decr speed_factor to %f",
- io->c->id, io->speed_factor);
- }
- }
- io->needs_flush = -1;
return status;
}
@@ -367,6 +358,10 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, const char *data, size_t length)
apr_status_t status = APR_SUCCESS;
apr_size_t remain;
+ if (length > 0) {
+ io->is_flushed = 0;
+ }
+
if (io->buffer_output) {
while (length > 0) {
remain = assure_scratch_space(io);
@@ -396,7 +391,6 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, const char *data, size_t length)
else {
status = apr_brigade_write(io->output, NULL, NULL, data, length);
}
- io->needs_flush = -1;
return status;
}
@@ -405,6 +399,10 @@ apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb)
apr_bucket *b;
apr_status_t status = APR_SUCCESS;
+ if (!APR_BRIGADE_EMPTY(bb)) {
+ io->is_flushed = 0;
+ }
+
while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
b = APR_BRIGADE_FIRST(bb);
@@ -447,21 +445,6 @@ apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb)
APR_BRIGADE_INSERT_TAIL(io->output, b);
}
}
- io->needs_flush = -1;
return status;
}
-int h2_conn_io_needs_flush(h2_conn_io *io)
-{
- if (io->needs_flush < 0) {
- apr_off_t len;
- apr_brigade_length(io->output, 0, &len);
- if (len > (io->pass_threshold * io->flush_factor * io->speed_factor)) {
- /* don't want to keep too much around */
- io->needs_flush = 1;
- return 1;
- }
- io->needs_flush = 0;
- }
- return 0;
-}
diff --git a/modules/http2/h2_conn_io.h b/modules/http2/h2_conn_io.h
index f617018565..0cbe479d1d 100644
--- a/modules/http2/h2_conn_io.h
+++ b/modules/http2/h2_conn_io.h
@@ -39,10 +39,8 @@ typedef struct {
apr_int64_t bytes_written;
int buffer_output;
- int needs_flush;
- apr_size_t pass_threshold;
- float flush_factor;
- float speed_factor;
+ apr_size_t flush_threshold;
+ unsigned int is_flushed : 1;
char *scratch;
apr_size_t ssize;
@@ -76,6 +74,9 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, struct h2_session *session);
*/
apr_status_t h2_conn_io_flush(h2_conn_io *io);
+/**
+ * Check if the buffered amount of data needs flushing.
+ */
int h2_conn_io_needs_flush(h2_conn_io *io);
#endif /* defined(__mod_h2__h2_conn_io__) */
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c
index 5823bd8e54..b535a02a13 100644
--- a/modules/http2/h2_mplx.c
+++ b/modules/http2/h2_mplx.c
@@ -17,6 +17,7 @@
#include <stddef.h>
#include <stdlib.h>
+#include <apr_atomic.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>
#include <apr_strings.h>
@@ -143,6 +144,12 @@ static void stream_output_consumed(void *ctx,
}
}
+static void stream_input_ev(void *ctx, h2_bucket_beam *beam)
+{
+ h2_mplx *m = ctx;
+ apr_atomic_set32(&m->event_pending, 1);
+}
+
static void stream_input_consumed(void *ctx,
h2_bucket_beam *beam, apr_off_t length)
{
@@ -337,18 +344,18 @@ int h2_mplx_shutdown(h2_mplx *m)
return max_stream_started;
}
-static void input_consumed_signal(h2_mplx *m, h2_stream *stream)
+static int input_consumed_signal(h2_mplx *m, h2_stream *stream)
{
- if (stream->input && stream->started) {
- h2_beam_send(stream->input, NULL, 0); /* trigger updates */
+ if (stream->input) {
+ return h2_beam_report_consumption(stream->input);
}
+ return 0;
}
static int output_consumed_signal(h2_mplx *m, h2_task *task)
{
- if (task->output.beam && task->worker_started && task->assigned) {
- /* trigger updates */
- h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+ if (task->output.beam) {
+ return h2_beam_report_consumption(task->output.beam);
}
return 0;
}
@@ -438,7 +445,7 @@ static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
h2_stream_cleanup(stream);
m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
- h2_beam_on_consumed(stream->input, NULL, NULL);
+ h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
/* Let anyone blocked reading know that there is no more to come */
h2_beam_abort(stream->input);
/* Remove mutex after, so that abort still finds cond to signal */
@@ -711,7 +718,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
"h2_mplx(%s): out open", task->id);
}
- h2_beam_on_consumed(stream->output, stream_output_consumed, task);
+ h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, task);
h2_beam_on_produced(stream->output, output_produced, m);
beamed_count = h2_beam_get_files_beamed(stream->output);
if (m->tx_handles_reserved >= beamed_count) {
@@ -785,7 +792,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
if (m->aborted) {
status = APR_ECONNABORTED;
}
- else if (!h2_iq_empty(m->readyq)) {
+ else if (apr_atomic_read32(&m->event_pending) > 0) {
status = APR_SUCCESS;
}
else {
@@ -809,6 +816,7 @@ static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response)
ap_assert(m);
ap_assert(stream);
h2_iq_append(m->readyq, stream->id);
+ apr_atomic_set32(&m->event_pending, 1);
if (m->added_output) {
apr_thread_cond_signal(m->added_output);
}
@@ -847,6 +855,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
else {
h2_ihash_add(m->streams, stream);
if (h2_stream_is_ready(stream)) {
+ apr_atomic_set32(&m->event_pending, 1);
h2_iq_append(m->readyq, stream->id);
}
else {
@@ -912,7 +921,8 @@ static h2_task *next_stream_task(h2_mplx *m)
}
h2_beam_timeout_set(stream->input, m->stream_timeout);
- h2_beam_on_consumed(stream->input, stream_input_consumed, m);
+ h2_beam_on_consumed(stream->input, stream_input_ev,
+ stream_input_consumed, m);
h2_beam_on_file_beam(stream->input, can_beam_file, m);
h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
@@ -1027,7 +1037,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
task->id);
/* more data will not arrive, resume the stream */
have_out_data_for(m, stream, 0);
- h2_beam_on_consumed(stream->output, NULL, NULL);
+ h2_beam_on_consumed(stream->output, NULL, NULL, NULL);
h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
}
else {
@@ -1041,7 +1051,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
* called from a worker thread and freeing memory pools
* is only safe in the only thread using it (and its
* parent pool / allocator) */
- h2_beam_on_consumed(stream->output, NULL, NULL);
+ h2_beam_on_consumed(stream->output, NULL, NULL, NULL);
h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
h2_ihash_remove(m->shold, stream->id);
h2_ihash_add(m->spurge, stream);
@@ -1351,7 +1361,12 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
* mplx master events dispatching
******************************************************************************/
-static int update_window(void *ctx, void *val)
+int h2_mplx_has_master_events(h2_mplx *m)
+{
+ return apr_atomic_read32(&m->event_pending) > 0;
+}
+
+static int report_consumption_iter(void *ctx, void *val)
{
input_consumed_signal(ctx, val);
return 1;
@@ -1367,25 +1382,29 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
h2_stream *stream;
size_t i, n;
+ if (!h2_mplx_has_master_events(m)) {
+ return APR_EAGAIN;
+ }
+
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
- "h2_mplx(%ld): dispatch events", m->id);
-
+ "h2_mplx(%ld): dispatch events", m->id);
+ apr_atomic_set32(&m->event_pending, 0);
/* update input windows for streams */
- h2_ihash_iter(m->streams, update_window, m);
- if (on_resume && !h2_iq_empty(m->readyq)) {
+ h2_ihash_iter(m->streams, report_consumption_iter, m);
+
+ if (!h2_iq_empty(m->readyq)) {
n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
for (i = 0; i < n; ++i) {
stream = h2_ihash_get(m->streams, ids[i]);
if (stream) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
- "h2_mplx(%ld-%d): on_resume",
- m->id, stream->id);
on_resume(on_ctx, stream);
}
}
}
-
+ if (!h2_iq_empty(m->readyq)) {
+ apr_atomic_set32(&m->event_pending, 1);
+ }
leave_mutex(m, acquired);
}
return status;
@@ -1400,6 +1419,7 @@ apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id)
h2_stream *s = h2_ihash_get(m->streams, stream_id);
if (s) {
h2_iq_append(m->readyq, stream_id);
+ apr_atomic_set32(&m->event_pending, 1);
}
leave_mutex(m, acquired);
}
diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h
index 25e07005e7..a82918fbbc 100644
--- a/modules/http2/h2_mplx.h
+++ b/modules/http2/h2_mplx.h
@@ -67,6 +67,7 @@ struct h2_mplx {
APR_RING_ENTRY(h2_mplx) link;
+ unsigned int event_pending;
unsigned int aborted : 1;
unsigned int need_registration : 1;
@@ -222,6 +223,12 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx);
typedef apr_status_t stream_ev_callback(void *ctx, struct h2_stream *stream);
/**
+ * Check if the multiplexer has events for the master connection pending.
+ * @return != 0 iff there are events pending
+ */
+int h2_mplx_has_master_events(h2_mplx *m);
+
+/**
* Dispatch events for the master connection, such as
± @param m the multiplexer
* @param on_resume new output data has arrived for a suspended stream
diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c
index ad313f3bdd..93cfe33718 100644
--- a/modules/http2/h2_session.c
+++ b/modules/http2/h2_session.c
@@ -48,6 +48,7 @@
static apr_status_t dispatch_master(h2_session *session);
+static apr_status_t h2_session_read(h2_session *session, int block);
static int h2_session_status_from_apr_status(apr_status_t rv)
{
@@ -240,17 +241,6 @@ static ssize_t send_cb(nghttp2_session *ngh2,
(void)ngh2;
(void)flags;
- if (h2_conn_io_needs_flush(&session->io)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_session(%ld): blocking due to io flush",
- session->id);
- status = h2_conn_io_flush(&session->io);
- if (status != APR_SUCCESS) {
- return h2_session_status_from_apr_status(status);
- }
- return NGHTTP2_ERR_WOULDBLOCK;
- }
-
status = h2_conn_io_write(&session->io, (const char *)data, length);
if (status == APR_SUCCESS) {
return length;
@@ -569,6 +559,16 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
return 0;
}
+static int h2_session_continue_data(h2_session *session) {
+ if (h2_mplx_has_master_events(session->mplx)) {
+ return 0;
+ }
+ if (h2_conn_io_needs_flush(&session->io)) {
+ return 0;
+ }
+ return 1;
+}
+
static char immortal_zeros[H2_MAX_PADLEN];
static int on_send_data_cb(nghttp2_session *ngh2,
@@ -589,17 +589,10 @@ static int on_send_data_cb(nghttp2_session *ngh2,
(void)ngh2;
(void)source;
- if (h2_conn_io_needs_flush(&session->io)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): blocking due to io flush",
- session->id, stream_id);
- status = h2_conn_io_flush(&session->io);
- if (status != APR_SUCCESS) {
- return h2_session_status_from_apr_status(status);
- }
+ if (!h2_session_continue_data(session)) {
return NGHTTP2_ERR_WOULDBLOCK;
}
-
+
if (frame->data.padlen > H2_MAX_PADLEN) {
return NGHTTP2_ERR_PROTO;
}
@@ -1418,8 +1411,6 @@ static apr_status_t h2_session_send(h2_session *session)
apr_socket_timeout_set(socket, session->s->timeout);
}
- /* This sends one round of frames from every able stream, plus
- * settings etc. if accumulated */
rv = nghttp2_session_send(session->ngh2);
if (socket) {
@@ -2058,7 +2049,11 @@ static apr_status_t dispatch_master(h2_session *session) {
status = h2_mplx_dispatch_master_events(session->mplx,
on_stream_resume, session);
- if (status != APR_SUCCESS) {
+ if (status == APR_EAGAIN) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c,
+ "h2_session(%ld): no master event available", session->id);
+ }
+ else if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c,
"h2_session(%ld): dispatch error", session->id);
dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
@@ -2118,6 +2113,14 @@ apr_status_t h2_session_process(h2_session *session, int async)
case H2_SESSION_ST_IDLE:
/* make certain, we send everything before we idle */
h2_conn_io_flush(&session->io);
+ /* We trust our connection into the default timeout/keepalive
+ * handling of the core filters/mpm iff:
+ * - keep_sync_until is not set
+ * - we have an async mpm
+ * - we have no open streams to process
+ * - we are not sitting on a Upgrade: request
+ * - we already have seen at least one request
+ */
if (!session->keep_sync_until && async && !session->open_streams
&& !session->r && session->remote.emitted_count) {
if (trace) {
@@ -2126,15 +2129,6 @@ apr_status_t h2_session_process(h2_session *session, int async)
"%d streams open", session->id,
session->open_streams);
}
- /* We do not return to the async mpm immediately, since under
- * load, mpms show the tendency to throw keep_alive connections
- * away very rapidly.
- * So, if we are still processing streams, we wait for the
- * normal timeout first and, on timeout, close.
- * If we have no streams, we still wait a short amount of
- * time here for the next frame to arrive, before handing
- * it to keep_alive processing of the mpm.
- */
status = h2_session_read(session, 0);
if (status == APR_SUCCESS) {
@@ -2219,7 +2213,6 @@ apr_status_t h2_session_process(h2_session *session, int async)
dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "error");
}
}
-
break;
case H2_SESSION_ST_BUSY:
@@ -2244,13 +2237,16 @@ apr_status_t h2_session_process(h2_session *session, int async)
}
status = dispatch_master(session);
- if (status != APR_SUCCESS) {
+ if (status != APR_SUCCESS && status != APR_EAGAIN) {
break;
}
if (nghttp2_session_want_write(session->ngh2)) {
ap_update_child_status(session->c->sbh, SERVER_BUSY_WRITE, NULL);
status = h2_session_send(session);
+ if (status == APR_SUCCESS) {
+ status = h2_conn_io_flush(&session->io);
+ }
if (status != APR_SUCCESS) {
dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
H2_ERR_INTERNAL_ERROR, "writing");
diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c
index 7cec844b22..7f919ab6e2 100644
--- a/modules/http2/h2_stream.c
+++ b/modules/http2/h2_stream.c
@@ -481,6 +481,7 @@ apr_status_t h2_stream_close_input(h2_stream *stream)
APR_BRIGADE_INSERT_TAIL(tmp, b);
status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
apr_brigade_destroy(tmp);
+ h2_beam_close(stream->input);
return status;
}
diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h
index 79d03a19a7..2821c6127e 100644
--- a/modules/http2/h2_version.h
+++ b/modules/http2/h2_version.h
@@ -26,7 +26,7 @@
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.8.7-DEV"
+#define MOD_HTTP2_VERSION "1.8.8-DEV"
/**
* @macro
@@ -34,7 +34,7 @@
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010807
+#define MOD_HTTP2_VERSION_NUM 0x010808
#endif /* mod_h2_h2_version_h */