diff options
author | Stefan Eissing <icing@apache.org> | 2022-04-13 08:38:12 +0000 |
---|---|---|
committer | Stefan Eissing <icing@apache.org> | 2022-04-13 08:38:12 +0000 |
commit | 5d3b2f1f0c37f672d313b60f5b7412ff3f7090e6 (patch) | |
tree | 55e7badf78a6b754fc5e70fcc1d3fb64cba544fc /modules/http2 | |
parent | fbb84e00fa53ca34b97f9acc4d024e512d4e6f23 (diff) | |
download | httpd-5d3b2f1f0c37f672d313b60f5b7412ff3f7090e6.tar.gz |
*) mod_http2: use the new REQUEST buckets to forward request
on secondary connections. Use the now generic
ap_process_connection() in h2 workers to process those.
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1899802 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'modules/http2')
-rw-r--r-- | modules/http2/h2.h | 1 | ||||
-rw-r--r-- | modules/http2/h2_c2.c | 276 | ||||
-rw-r--r-- | modules/http2/h2_c2.h | 5 | ||||
-rw-r--r-- | modules/http2/h2_c2_filter.c | 37 | ||||
-rw-r--r-- | modules/http2/h2_c2_filter.h | 27 | ||||
-rw-r--r-- | modules/http2/h2_conn_ctx.h | 1 | ||||
-rw-r--r-- | modules/http2/h2_mplx.c | 46 | ||||
-rw-r--r-- | modules/http2/h2_mplx.h | 14 | ||||
-rw-r--r-- | modules/http2/h2_push.h | 2 | ||||
-rw-r--r-- | modules/http2/h2_request.c | 31 | ||||
-rw-r--r-- | modules/http2/h2_request.h | 2 | ||||
-rw-r--r-- | modules/http2/h2_session.c | 8 | ||||
-rw-r--r-- | modules/http2/h2_session.h | 4 | ||||
-rw-r--r-- | modules/http2/h2_stream.c | 112 | ||||
-rw-r--r-- | modules/http2/h2_stream.h | 6 | ||||
-rw-r--r-- | modules/http2/h2_switch.c | 31 | ||||
-rw-r--r-- | modules/http2/h2_workers.c | 38 |
17 files changed, 349 insertions, 292 deletions
diff --git a/modules/http2/h2.h b/modules/http2/h2.h index f707da41c1..179c55db23 100644 --- a/modules/http2/h2.h +++ b/modules/http2/h2.h @@ -188,7 +188,6 @@ struct h2_request { apr_table_t *headers; apr_time_t request_time; - unsigned int chunked : 1; /* iff request body needs to be forwarded as chunked */ apr_off_t raw_bytes; /* RAW network bytes that generated this request - if known. */ int http_status; /* Store a possible HTTP status code that gets * defined before creating the dummy HTTP/1.1 diff --git a/modules/http2/h2_c2.c b/modules/http2/h2_c2.c index 46d93e7ebe..f11a53cf25 100644 --- a/modules/http2/h2_c2.c +++ b/modules/http2/h2_c2.c @@ -57,6 +57,7 @@ static apr_socket_t *dummy_socket; static ap_filter_rec_t *c2_net_in_filter_handle; static ap_filter_rec_t *c2_net_out_filter_handle; +static ap_filter_rec_t *c2_request_in_filter_handle; static ap_filter_rec_t *c2_notes_out_filter_handle; @@ -165,12 +166,11 @@ static apr_status_t h2_c2_filter_in(ap_filter_t* f, apr_status_t status = APR_SUCCESS; apr_bucket *b; apr_off_t bblen; - const int trace1 = APLOGctrace1(f->c); - apr_size_t rmax = ((readbytes <= APR_SIZE_MAX)? + apr_size_t rmax = ((readbytes <= APR_SIZE_MAX)? (apr_size_t)readbytes : APR_SIZE_MAX); conn_ctx = h2_conn_ctx_get(f->c); - ap_assert(conn_ctx); + AP_DEBUG_ASSERT(conn_ctx); if (mode == AP_MODE_INIT) { return ap_get_brigade(f->c->input_filters, bb, mode, block, readbytes); @@ -180,8 +180,8 @@ static apr_status_t h2_c2_filter_in(ap_filter_t* f, return APR_ECONNABORTED; } - if (APLOGctrace1(f->c)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, + if (APLOGctrace3(f->c)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, f->c, "h2_c2_in(%s-%d): read, mode=%d, block=%d, readbytes=%ld", conn_ctx->id, conn_ctx->stream_id, mode, block, (long)readbytes); } @@ -198,7 +198,7 @@ static apr_status_t h2_c2_filter_in(ap_filter_t* f, while (APR_BRIGADE_EMPTY(fctx->bb)) { /* Get more input data for our request. */ - if (APLOGctrace1(f->c)) { + if (APLOGctrace2(f->c)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c, "h2_c2_in(%s-%d): get more data from mplx, block=%d, " "readbytes=%ld", @@ -260,8 +260,8 @@ receive: return status; } - if (trace1) { - h2_util_bb_log(f->c, conn_ctx->stream_id, APLOG_TRACE2, + if (APLOGctrace3(f->c)) { + h2_util_bb_log(f->c, conn_ctx->stream_id, APLOG_TRACE3, "c2 input.bb", fctx->bb); } @@ -357,9 +357,27 @@ static apr_status_t beam_out(conn_rec *c2, h2_conn_ctx_t *conn_ctx, apr_bucket_b static apr_status_t h2_c2_filter_out(ap_filter_t* f, apr_bucket_brigade* bb) { h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(f->c); + apr_bucket *e; apr_status_t rv; ap_assert(conn_ctx); + if (!conn_ctx->has_final_response) { + for (e = APR_BRIGADE_FIRST(bb); + e != APR_BRIGADE_SENTINEL(bb); + e = APR_BUCKET_NEXT(e)) + { + if (AP_BUCKET_IS_RESPONSE(e)) { + ap_bucket_response *resp = e->data; + if (resp->status >= 200) { + conn_ctx->has_final_response = 1; + break; + } + } + if (APR_BUCKET_IS_EOS(e)) { + break; + } + } + } rv = beam_out(f->c, conn_ctx, bb); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, rv, f->c, @@ -371,8 +389,19 @@ static apr_status_t h2_c2_filter_out(ap_filter_t* f, apr_bucket_brigade* bb) return rv; } -static apr_status_t c2_run_pre_connection(conn_rec *c2, apr_socket_t *csd) +static int c2_hook_pre_connection(conn_rec *c2, void *csd) { + h2_conn_ctx_t *conn_ctx; + + if (!c2->master || !(conn_ctx = h2_conn_ctx_get(c2)) || !conn_ctx->stream_id) { + return DECLINED; + } + + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2, + "h2_c2(%s-%d), adding filters", + conn_ctx->id, conn_ctx->stream_id); + ap_add_input_filter_handle(c2_net_in_filter_handle, NULL, NULL, c2); + ap_add_output_filter_handle(c2_net_out_filter_handle, NULL, NULL, c2); if (c2->keepalives == 0) { /* Simulate that we had already a request on this connection. Some * hooks trigger special behaviour when keepalives is 0. @@ -388,169 +417,8 @@ static apr_status_t c2_run_pre_connection(conn_rec *c2, apr_socket_t *csd) * is unnecessary on a h2 stream. */ c2->keepalive = AP_CONN_CLOSE; - return ap_run_pre_connection(c2, csd); - } - ap_assert(c2->output_filters); - return APR_SUCCESS; -} - -apr_status_t h2_c2_process(conn_rec *c2, apr_thread_t *thread, int worker_id) -{ - h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2); - - ap_assert(conn_ctx); - ap_assert(conn_ctx->mplx); - - /* See the discussion at <https://github.com/icing/mod_h2/issues/195> - * - * Each conn_rec->id is supposed to be unique at a point in time. Since - * some modules (and maybe external code) uses this id as an identifier - * for the request_rec they handle, it needs to be unique for secondary - * connections also. - * - * The MPM module assigns the connection ids and mod_unique_id is using - * that one to generate identifier for requests. While the implementation - * works for HTTP/1.x, the parallel execution of several requests per - * connection will generate duplicate identifiers on load. - * - * The original implementation for secondary connection identifiers used - * to shift the master connection id up and assign the stream id to the - * lower bits. This was cramped on 32 bit systems, but on 64bit there was - * enough space. - * - * As issue 195 showed, mod_unique_id only uses the lower 32 bit of the - * connection id, even on 64bit systems. Therefore collisions in request ids. - * - * The way master connection ids are generated, there is some space "at the - * top" of the lower 32 bits on allmost all systems. If you have a setup - * with 64k threads per child and 255 child processes, you live on the edge. - * - * The new implementation shifts 8 bits and XORs in the worker - * id. This will experience collisions with > 256 h2 workers and heavy - * load still. There seems to be no way to solve this in all possible - * configurations by mod_h2 alone. - */ - c2->id = (c2->master->id << 8)^worker_id; - - if (!conn_ctx->pre_conn_done) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2, - "h2_c2(%s-%d), adding filters", - conn_ctx->id, conn_ctx->stream_id); - ap_add_input_filter_handle(c2_net_in_filter_handle, NULL, NULL, c2); - ap_add_output_filter_handle(c2_net_out_filter_handle, NULL, NULL, c2); - - c2_run_pre_connection(c2, ap_get_conn_socket(c2)); - conn_ctx->pre_conn_done = 1; - } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2, - "h2_c2(%s-%d): process connection", - conn_ctx->id, conn_ctx->stream_id); - - c2->current_thread = thread; - ap_run_process_connection(c2); - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2, - "h2_c2(%s-%d): processing done", - conn_ctx->id, conn_ctx->stream_id); - - return APR_SUCCESS; -} - -static apr_status_t c2_process(h2_conn_ctx_t *conn_ctx, conn_rec *c) -{ - const h2_request *req = conn_ctx->request; - conn_state_t *cs = c->cs; - request_rec *r; - - r = h2_create_request_rec(conn_ctx->request, c); - if (!r) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_c2(%s-%d): create request_rec failed, r=NULL", - conn_ctx->id, conn_ctx->stream_id); - goto cleanup; - } - if (r->status != HTTP_OK) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_c2(%s-%d): create request_rec failed, r->status=%d", - conn_ctx->id, conn_ctx->stream_id, r->status); - goto cleanup; - } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_c2(%s-%d): created request_rec for %s", - conn_ctx->id, conn_ctx->stream_id, r->the_request); - conn_ctx->server = r->server; - - /* the request_rec->server carries the timeout value that applies */ - h2_conn_ctx_set_timeout(conn_ctx, r->server->timeout); - /* We only handle this one request on the connection and tell everyone - * that there is no need to keep it "clean" if something fails. Also, - * this prevents mod_reqtimeout from doing funny business with monitoring - * keepalive timeouts. - */ - r->connection->keepalive = AP_CONN_CLOSE; - - if (conn_ctx->beam_in && !apr_table_get(r->headers_in, "Content-Length")) { - r->body_indeterminate = 1; - } - - if (h2_config_sgeti(conn_ctx->server, H2_CONF_COPY_FILES)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_mplx(%s-%d): copy_files in output", - conn_ctx->id, conn_ctx->stream_id); - h2_beam_set_copy_files(conn_ctx->beam_out, 1); - } - - ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, r); - if (cs) { - cs->state = CONN_STATE_HANDLER; - } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_c2(%s-%d): start process_request", - conn_ctx->id, conn_ctx->stream_id); - - /* Add the raw bytes of the request (e.g. header frame lengths to - * the logio for this request. */ - if (req->raw_bytes && h2_c_logio_add_bytes_in) { - h2_c_logio_add_bytes_in(c, req->raw_bytes); } - - ap_process_request(r); - /* After the call to ap_process_request, the - * request pool may have been deleted. */ - r = NULL; - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_c2(%s-%d): process_request done", - conn_ctx->id, conn_ctx->stream_id); - if (cs) - cs->state = CONN_STATE_WRITE_COMPLETION; - -cleanup: - return APR_SUCCESS; -} - -static int h2_c2_hook_process(conn_rec* c) -{ - h2_conn_ctx_t *ctx; - - if (!c->master) { - return DECLINED; - } - - ctx = h2_conn_ctx_get(c); - if (ctx->stream_id) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_h2, processing request directly"); - c2_process(ctx, c); - return DONE; - } - else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "secondary_conn(%ld): no h2 stream assing?", c->id); - } - return DECLINED; + return OK; } static void check_push(request_rec *r, const char *tag) @@ -580,21 +448,58 @@ static void check_push(request_rec *r, const char *tag) } } -static int h2_c2_hook_post_read_request(request_rec *r) +static void c2_pre_read_request(request_rec *r, conn_rec *c2) +{ + h2_conn_ctx_t *conn_ctx; + + if (!c2->master || !(conn_ctx = h2_conn_ctx_get(c2)) || !conn_ctx->stream_id) { + return; + } + ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r, + "h2_c2(%s-%d): adding request filters", + conn_ctx->id, conn_ctx->stream_id); + ap_add_input_filter_handle(c2_request_in_filter_handle, NULL, r, r->connection); + ap_add_output_filter_handle(c2_notes_out_filter_handle, NULL, r, r->connection); +} + +static int c2_post_read_request(request_rec *r) { - h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(r->connection); + h2_conn_ctx_t *conn_ctx; + conn_rec *c2 = r->connection; - if (conn_ctx && conn_ctx->stream_id) { + if (!c2->master || !(conn_ctx = h2_conn_ctx_get(c2)) || !conn_ctx->stream_id) { + return DECLINED; + } + /* Now that the request_rec is fully initialized, set relevant params */ + conn_ctx->server = r->server; + h2_conn_ctx_set_timeout(conn_ctx, r->server->timeout); + /* We only handle this one request on the connection and tell everyone + * that there is no need to keep it "clean" if something fails. Also, + * this prevents mod_reqtimeout from doing funny business with monitoring + * keepalive timeouts. + */ + r->connection->keepalive = AP_CONN_CLOSE; - ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r, - "h2_c2(%s-%d): adding request filters", + if (conn_ctx->beam_in && !apr_table_get(r->headers_in, "Content-Length")) { + r->body_indeterminate = 1; + } + + if (h2_config_sgeti(conn_ctx->server, H2_CONF_COPY_FILES)) { + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, + "h2_mplx(%s-%d): copy_files in output", conn_ctx->id, conn_ctx->stream_id); - ap_add_output_filter_handle(c2_notes_out_filter_handle, NULL, r, r->connection); + h2_beam_set_copy_files(conn_ctx->beam_out, 1); } - return DECLINED; + + /* Add the raw bytes of the request (e.g. header frame lengths to + * the logio for this request. */ + if (conn_ctx->request->raw_bytes && h2_c_logio_add_bytes_in) { + h2_c_logio_add_bytes_in(c2, conn_ctx->request->raw_bytes); + } + return OK; } -static int h2_c2_hook_fixups(request_rec *r) +static int c2_hook_fixups(request_rec *r) { conn_rec *c2 = r->connection; h2_conn_ctx_t *conn_ctx; @@ -613,12 +518,14 @@ void h2_c2_register_hooks(void) /* When the connection processing actually starts, we might * take over, if the connection is for a h2 stream. */ - ap_hook_process_connection(h2_c2_hook_process, - NULL, NULL, APR_HOOK_FIRST); + ap_hook_pre_connection(c2_hook_pre_connection, + NULL, NULL, APR_HOOK_MIDDLE); + /* We need to manipulate the standard HTTP/1.1 protocol filters and * install our own. This needs to be done very early. */ - ap_hook_post_read_request(h2_c2_hook_post_read_request, NULL, NULL, APR_HOOK_REALLY_FIRST); - ap_hook_fixups(h2_c2_hook_fixups, NULL, NULL, APR_HOOK_LAST); + ap_hook_pre_read_request(c2_pre_read_request, NULL, NULL, APR_HOOK_MIDDLE); + ap_hook_post_read_request(c2_post_read_request, NULL, NULL, APR_HOOK_REALLY_FIRST); + ap_hook_fixups(c2_hook_fixups, NULL, NULL, APR_HOOK_LAST); c2_net_in_filter_handle = ap_register_input_filter("H2_C2_NET_IN", h2_c2_filter_in, @@ -626,6 +533,9 @@ void h2_c2_register_hooks(void) c2_net_out_filter_handle = ap_register_output_filter("H2_C2_NET_OUT", h2_c2_filter_out, NULL, AP_FTYPE_NETWORK); + c2_request_in_filter_handle = + ap_register_input_filter("H2_C2_REQUEST_IN", h2_c2_filter_request_in, + NULL, AP_FTYPE_PROTOCOL); c2_notes_out_filter_handle = ap_register_output_filter("H2_C2_NOTES_OUT", h2_c2_filter_notes_out, NULL, AP_FTYPE_PROTOCOL); diff --git a/modules/http2/h2_c2.h b/modules/http2/h2_c2.h index cfe9755fcc..ac0da503a0 100644 --- a/modules/http2/h2_c2.h +++ b/modules/http2/h2_c2.h @@ -38,11 +38,6 @@ void h2_c2_destroy(conn_rec *c2); */ void h2_c2_abort(conn_rec *c2, conn_rec *from); -/** - * Process a secondary connection for a HTTP/2 stream request. - */ -apr_status_t h2_c2_process(conn_rec *c, apr_thread_t *thread, int worker_id); - void h2_c2_register_hooks(void); #endif /* defined(__mod_h2__h2_c2__) */ diff --git a/modules/http2/h2_c2_filter.c b/modules/http2/h2_c2_filter.c index 22299c4794..ed2bc3661d 100644 --- a/modules/http2/h2_c2_filter.c +++ b/modules/http2/h2_c2_filter.c @@ -87,3 +87,40 @@ apr_status_t h2_c2_filter_notes_out(ap_filter_t *f, apr_bucket_brigade *bb) pass: return ap_pass_brigade(f->next, bb); } + +apr_status_t h2_c2_filter_request_in(ap_filter_t *f, + apr_bucket_brigade *bb, + ap_input_mode_t mode, + apr_read_type_e block, + apr_off_t readbytes) +{ + h2_conn_ctx_t *conn_ctx; + apr_bucket *b; + + /* just get out of the way for things we don't want to handle. */ + if (mode != AP_MODE_READBYTES && mode != AP_MODE_GETLINE) { + return ap_get_brigade(f->next, bb, mode, block, readbytes); + } + + /* This filter is a one-time wonder */ + ap_remove_input_filter(f); + + if (f->c->master && (conn_ctx = h2_conn_ctx_get(f->c)) && conn_ctx->stream_id) { + if (conn_ctx->request->http_status != H2_HTTP_STATUS_UNSET) { + /* error was encountered preparing this request */ + b = ap_bucket_error_create(conn_ctx->request->http_status, NULL, f->r->pool, + f->c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, b); + return APR_SUCCESS; + } + b = h2_request_create_bucket(conn_ctx->request, f->r); + APR_BRIGADE_INSERT_TAIL(bb, b); + if (!conn_ctx->beam_in) { + b = apr_bucket_eos_create(f->c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, b); + } + return APR_SUCCESS; + } + + return ap_get_brigade(f->next, bb, mode, block, readbytes); +} diff --git a/modules/http2/h2_c2_filter.h b/modules/http2/h2_c2_filter.h index 13de7d6eca..c00fd2ae15 100644 --- a/modules/http2/h2_c2_filter.h +++ b/modules/http2/h2_c2_filter.h @@ -18,21 +18,20 @@ #define __mod_h2__h2_c2_filter__ /** - * h2_from_h1 parses a HTTP/1.1 response into - * - response status - * - a list of header values - * - a series of bytes that represent the response body alone, without - * any meta data, such as inserted by chunked transfer encoding. - * - * All data is allocated from the stream memory pool. - * - * Again, see comments in h2_request: ideally we would take the headers - * and status from the httpd structures instead of parsing them here, but - * we need to have all handlers and filters involved in request/response - * processing, so this seems to be the way for now. + * Output filter that inspects the request_rec->notes of the request + * itself and possible internal redirects to detect conditions that + * merit specific HTTP/2 response codes, such as 421. */ -struct h2_response_parser; - apr_status_t h2_c2_filter_notes_out(ap_filter_t *f, apr_bucket_brigade *bb); +/** + * Input filter on secondary connections that insert the REQUEST bucket + * with the request to perform and then removes itself. + */ +apr_status_t h2_c2_filter_request_in(ap_filter_t *f, + apr_bucket_brigade *bb, + ap_input_mode_t mode, + apr_read_type_e block, + apr_off_t readbytes); + #endif /* defined(__mod_h2__h2_c2_filter__) */ diff --git a/modules/http2/h2_conn_ctx.h b/modules/http2/h2_conn_ctx.h index ef62702045..b3c798c9be 100644 --- a/modules/http2/h2_conn_ctx.h +++ b/modules/http2/h2_conn_ctx.h @@ -43,7 +43,6 @@ struct h2_conn_ctx_t { struct h2_mplx *mplx; /* c2: the multiplexer */ struct h2_c2_transit *transit; /* c2: transit pool and bucket_alloc */ - int pre_conn_done; /* has pre_connection setup run? */ int stream_id; /* c1: 0, c2: stream id processed */ apr_pool_t *req_pool; /* c2: a c2 child pool for a request */ const struct h2_request *request; /* c2: the request to process */ diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index e14dd382e9..c15b23ffad 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -715,27 +715,6 @@ void h2_mplx_c1_process(h2_mplx *m, H2_MPLX_LEAVE(m); } -apr_status_t h2_mplx_c1_fwd_input(h2_mplx *m, struct h2_iqueue *input_pending, - h2_stream_get_fn *get_stream, - struct h2_session *session) -{ - int sid; - - H2_MPLX_ENTER(m); - - while ((sid = h2_iq_shift(input_pending)) > 0) { - h2_stream *stream = get_stream(session, sid); - if (stream) { - H2_MPLX_LEAVE(m); - h2_stream_flush_input(stream); - H2_MPLX_ENTER(m); - } - } - - H2_MPLX_LEAVE(m); - return APR_SUCCESS; -} - static void c2_beam_input_write_notify(void *ctx, h2_bucket_beam *beam) { conn_rec *c = ctx; @@ -1083,6 +1062,31 @@ apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id) return status; } +apr_status_t h2_mplx_c1_input_closed(h2_mplx *m, int stream_id) +{ + h2_stream *stream; + h2_conn_ctx_t *c2_ctx; + apr_status_t status = APR_EAGAIN; + + H2_MPLX_ENTER_ALWAYS(m); + stream = h2_ihash_get(m->streams, stream_id); + if (stream && (c2_ctx = h2_conn_ctx_get(stream->c2))) { + if (c2_ctx->beam_in) { + apr_bucket_brigade *tmp =apr_brigade_create( + stream->pool, m->c1->bucket_alloc); + apr_bucket *eos = apr_bucket_eos_create(m->c1->bucket_alloc); + apr_off_t written; + + APR_BRIGADE_INSERT_TAIL(tmp, eos); + status = h2_beam_send(c2_ctx->beam_in, m->c1, + tmp, APR_BLOCK_READ, &written); + apr_brigade_destroy(tmp); + } + } + H2_MPLX_LEAVE(m); + return status; +} + static apr_status_t mplx_pollset_create(h2_mplx *m) { /* stream0 output only */ diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index b33545fa9c..4091d53e2d 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -153,11 +153,6 @@ void h2_mplx_c1_process(h2_mplx *m, struct h2_session *session, int *pstream_count); -apr_status_t h2_mplx_c1_fwd_input(h2_mplx *m, struct h2_iqueue *input_pending, - h2_stream_get_fn *get_stream, - struct h2_session *session); - - /** * Stream priorities have changed, reschedule pending requests. * @@ -200,6 +195,15 @@ apr_status_t h2_mplx_c1_streams_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx) apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id); /** + * Input for stream has been closed. Notify a possibly started + * and waiting stream by sending an EOS. + * @param m the mplx + * @param stream_id the closed stream + * @return APR_SUCCESS iff EOS was sent, APR_EAGAIN if not necessary + */ +apr_status_t h2_mplx_c1_input_closed(h2_mplx *m, int stream_id); + +/** * Get readonly access to a stream for a secondary connection. */ const struct h2_stream *h2_mplx_c2_stream_get(h2_mplx *m, int stream_id); diff --git a/modules/http2/h2_push.h b/modules/http2/h2_push.h index 008a74191b..d514bbffed 100644 --- a/modules/http2/h2_push.h +++ b/modules/http2/h2_push.h @@ -17,6 +17,8 @@ #ifndef __mod_h2__h2_push__ #define __mod_h2__h2_push__ +#include <http_protocol.h> + #include "h2.h" struct h2_request; diff --git a/modules/http2/h2_request.c b/modules/http2/h2_request.c index aa5f764a98..aa54351969 100644 --- a/modules/http2/h2_request.c +++ b/modules/http2/h2_request.c @@ -205,13 +205,9 @@ apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, int eos, apr_table_setn(req->headers, "Host", req->authority); } - s = apr_table_get(req->headers, "Content-Length"); - if (!s) { - /* HTTP/2 does not need a Content-Length for framing, but our - * internal request processing is used to HTTP/1.1, so we - * need to either add a Content-Length or a Transfer-Encoding - * if any content can be expected. */ - if (eos && apr_table_get(req->headers, "Content-Type")) { + if (eos) { + s = apr_table_get(req->headers, "Content-Length"); + if (!s && apr_table_get(req->headers, "Content-Type")) { /* If we have a content-type, but already seen eos, no more * data will come. Signal a zero content length explicitly. */ @@ -291,6 +287,27 @@ static request_rec *my_ap_create_request(conn_rec *c) } #endif +apr_bucket *h2_request_create_bucket(const h2_request *req, request_rec *r) +{ + conn_rec *c = r->connection; + apr_table_t *headers = apr_table_copy(r->pool, req->headers); + const char *uri = req->path; + + AP_DEBUG_ASSERT(req->authority); + if (req->scheme && (ap_cstr_casecmp(req->scheme, + ap_ssl_conn_is_ssl(c->master? c->master : c)? "https" : "http") + || !ap_cstr_casecmp("CONNECT", req->method))) { + /* Client sent a non-matching ':scheme' pseudo header or CONNECT. + * In this case, we use an absolute URI. + */ + uri = apr_psprintf(r->pool, "%s://%s%s", + req->scheme, req->authority, req->path ? req->path : ""); + } + + return ap_bucket_request_create(req->method, uri, "HTTP/2.0", headers, + r->pool, c->bucket_alloc); +} + request_rec *h2_create_request_rec(const h2_request *req, conn_rec *c) { int access_status = HTTP_OK; diff --git a/modules/http2/h2_request.h b/modules/http2/h2_request.h index 0fc207cba3..6c0a468937 100644 --- a/modules/http2/h2_request.h +++ b/modules/http2/h2_request.h @@ -49,5 +49,7 @@ h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src); */ request_rec *h2_create_request_rec(const h2_request *req, conn_rec *conn); +apr_bucket *h2_request_create_bucket(const h2_request *req, request_rec *r); + #endif /* defined(__mod_h2__h2_request__) */ diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 273e13f708..117a863267 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -879,7 +879,6 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec * session->max_stream_count = h2_config_sgeti(s, H2_CONF_MAX_STREAMS); session->max_stream_mem = h2_config_sgeti(s, H2_CONF_STREAM_MAX_MEM); - session->in_pending = h2_iq_create(session->pool, (int)session->max_stream_count); session->out_c1_blocked = h2_iq_create(session->pool, (int)session->max_stream_count); session->ready_to_process = h2_iq_create(session->pool, (int)session->max_stream_count); @@ -1635,7 +1634,7 @@ static void on_stream_event(void *ctx, h2_stream *stream, h2_stream_event_t ev) h2_session *session = ctx; switch (ev) { case H2_SEV_IN_DATA_PENDING: - h2_iq_append(session->in_pending, stream->id); + session->input_flushed = 1; break; case H2_SEV_OUT_C1_BLOCK: h2_iq_append(session->out_c1_blocked, stream->id); @@ -1787,10 +1786,9 @@ apr_status_t h2_session_process(h2_session *session, int async) transit(session, "scheduled stream", H2_SESSION_ST_BUSY); } - if (!h2_iq_empty(session->in_pending)) { - h2_mplx_c1_fwd_input(session->mplx, session->in_pending, - get_stream, session); + if (session->input_flushed) { transit(session, "forwarded input", H2_SESSION_ST_BUSY); + session->input_flushed = 0; } if (!h2_iq_empty(session->out_c1_blocked)) { diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index 76c8ef6b87..c9f2cc50ee 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -112,8 +112,8 @@ typedef struct h2_session { char status[64]; /* status message for scoreboard */ int last_status_code; /* the one already reported */ const char *last_status_msg; /* the one already reported */ - - struct h2_iqueue *in_pending; /* all streams with input pending */ + + int input_flushed; /* stream input was flushed */ struct h2_iqueue *out_c1_blocked; /* all streams with output blocked on c1 buffer full */ struct h2_iqueue *ready_to_process; /* all streams ready for processing */ diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 9b879e923a..ea75ec21fc 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -211,6 +211,30 @@ cleanup: return APR_SUCCESS; } +static apr_status_t input_flush(h2_stream *stream) +{ + apr_status_t status = APR_SUCCESS; + apr_off_t written; + + if (!stream->in_buffer) goto cleanup; + + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c1, + H2_STRM_MSG(stream, "flush input")); + if (!stream->input) { + h2_stream_setup_input(stream); + } + status = h2_beam_send(stream->input, stream->session->c1, + stream->in_buffer, APR_BLOCK_READ, &written); + stream->in_last_write = apr_time_now(); + if (APR_SUCCESS != status && stream->state == H2_SS_CLOSED_L) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c1, + H2_STRM_MSG(stream, "send input error")); + h2_stream_dispatch(stream, H2_SEV_IN_ERROR); + } +cleanup: + return status; +} + static void input_append_bucket(h2_stream *stream, apr_bucket *b) { if (!stream->in_buffer) { @@ -252,11 +276,18 @@ static apr_status_t close_input(h2_stream *stream) } stream->input_closed = 1; - if (stream->in_buffer || stream->input) { + if (stream->in_buffer) { b = apr_bucket_eos_create(c->bucket_alloc); input_append_bucket(stream, b); + input_flush(stream); h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING); } + else { + rv = h2_mplx_c1_input_closed(stream->session->mplx, stream->id); + if (APR_SUCCESS == rv) { + h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING); + } + } cleanup: return rv; } @@ -472,29 +503,6 @@ leave: return status; } -apr_status_t h2_stream_flush_input(h2_stream *stream) -{ - apr_status_t status = APR_SUCCESS; - apr_off_t written; - - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c1, - H2_STRM_MSG(stream, "flush input")); - if (stream->in_buffer && !APR_BRIGADE_EMPTY(stream->in_buffer)) { - if (!stream->input) { - h2_stream_setup_input(stream); - } - status = h2_beam_send(stream->input, stream->session->c1, - stream->in_buffer, APR_BLOCK_READ, &written); - stream->in_last_write = apr_time_now(); - if (APR_SUCCESS != status && stream->state == H2_SS_CLOSED_L) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c1, - H2_STRM_MSG(stream, "send input error")); - h2_stream_dispatch(stream, H2_SEV_IN_ERROR); - } - } - return status; -} - apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags, const uint8_t *data, size_t len) { @@ -515,6 +523,7 @@ apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags, } stream->in_data_octets += len; input_append_data(stream, (const char*)data, len); + input_flush(stream); h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING); } return status; @@ -858,6 +867,20 @@ cleanup: if (APR_SUCCESS == status) { stream->request = req; stream->rtmp = NULL; + + if (APLOGctrace4(stream->session->c1)) { + int i; + const apr_array_header_t *t_h = apr_table_elts(req->headers); + const apr_table_entry_t *t_elt = (apr_table_entry_t *)t_h->elts; + ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, stream->session->c1, + H2_STRM_MSG(stream,"headers received from client:")); + for (i = 0; i < t_h->nelts; i++, t_elt++) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, stream->session->c1, + H2_STRM_MSG(stream, " %s: %s"), + ap_escape_logitem(stream->pool, t_elt->key), + ap_escape_logitem(stream->pool, t_elt->val)); + } + } } return status; } @@ -911,13 +934,18 @@ static apr_status_t buffer_output_receive(h2_stream *stream) goto cleanup; } - H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre"); - rv = h2_beam_receive(stream->output, stream->session->c1, stream->out_buffer, - APR_NONBLOCK_READ, stream->session->max_stream_mem - buf_len); - if (APR_SUCCESS != rv) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c1, - H2_STRM_MSG(stream, "out_buffer, receive unsuccessful")); - goto cleanup; + if (stream->output_eos) { + rv = APR_BRIGADE_EMPTY(stream->out_buffer)? APR_EOF : APR_SUCCESS; + } + else { + H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre"); + rv = h2_beam_receive(stream->output, stream->session->c1, stream->out_buffer, + APR_NONBLOCK_READ, stream->session->max_stream_mem - buf_len); + if (APR_SUCCESS != rv) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c1, + H2_STRM_MSG(stream, "out_buffer, receive unsuccessful")); + goto cleanup; + } } /* get rid of buckets we have no need for */ @@ -930,6 +958,9 @@ static apr_status_t buffer_output_receive(h2_stream *stream) APR_BUCKET_REMOVE(b); apr_bucket_destroy(b); } + else if (APR_BUCKET_IS_EOS(b)) { + stream->output_eos = 1; + } } else if (b->length == 0) { /* zero length data */ APR_BUCKET_REMOVE(b); @@ -1281,7 +1312,7 @@ apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount) return APR_SUCCESS; } -static apr_off_t buffer_output_data_to_send(h2_stream *stream, int *peos) +static apr_off_t output_data_buffered(h2_stream *stream, int *peos) { /* How much data do we have in our buffers that we can write? */ apr_off_t buf_len = 0; @@ -1380,7 +1411,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, } /* How much data do we have in our buffers that we can write? */ - buf_len = buffer_output_data_to_send(stream, &eos); + buf_len = output_data_buffered(stream, &eos); if (buf_len < length && !eos) { /* read more? */ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c1, @@ -1388,7 +1419,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, session->id, (int)stream_id, (long)length, (long)buf_len); rv = buffer_output_receive(stream); /* process all headers sitting at the buffer head. */ - while (APR_SUCCESS == rv) { + while (APR_SUCCESS == rv && !eos && !stream->sent_trailers) { rv = buffer_output_process_headers(stream); if (APR_SUCCESS != rv && APR_EAGAIN != rv) { ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c1, @@ -1396,17 +1427,20 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, "data_cb, error processing headers")); return NGHTTP2_ERR_CALLBACK_FAILURE; } - buf_len = buffer_output_data_to_send(stream, &eos); + buf_len = output_data_buffered(stream, &eos); } - if (APR_EOF == rv) { - eos = 1; - } - else if (APR_SUCCESS != rv && !APR_STATUS_IS_EAGAIN(rv)) { + if (APR_SUCCESS != rv && !APR_STATUS_IS_EAGAIN(rv)) { ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c1, H2_STRM_LOG(APLOGNO(02938), stream, "data_cb, reading data")); return NGHTTP2_ERR_CALLBACK_FAILURE; } + + if (stream->sent_trailers) { + AP_DEBUG_ASSERT(eos); + AP_DEBUG_ASSERT(buf_len == 0); + return NGHTTP2_ERR_DEFERRED; + } } if (buf_len > (apr_off_t)length) { diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index c9683c57b2..110c11efa3 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -17,6 +17,8 @@ #ifndef __mod_h2__h2_stream__ #define __mod_h2__h2_stream__ +#include <http_protocol.h> + #include "h2.h" /** @@ -26,7 +28,7 @@ * connection to the client. The h2_session writes to the h2_stream, * adding HEADERS and DATA and finally an EOS. When headers are done, * h2_stream is scheduled for handling, which is expected to produce - * RESPONSE buclets. + * RESPONSE buckets. */ struct h2_mplx; @@ -207,8 +209,6 @@ apr_status_t h2_stream_recv_frame(h2_stream *stream, int frame_type, int flags, apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags, const uint8_t *data, size_t len); -apr_status_t h2_stream_flush_input(h2_stream *stream); - /** * Reset the stream. Stream write/reads will return errors afterwards. * diff --git a/modules/http2/h2_switch.c b/modules/http2/h2_switch.c index 12dd3e8abf..93327afd5b 100644 --- a/modules/http2/h2_switch.c +++ b/modules/http2/h2_switch.c @@ -125,6 +125,28 @@ static int h2_protocol_propose(conn_rec *c, request_rec *r, return proposed? DECLINED : OK; } +static void remove_output_filters_below(ap_filter_t *f, ap_filter_type ftype) +{ + ap_filter_t *fnext; + + while (f && f->frec->ftype < ftype) { + fnext = f->next; + ap_remove_output_filter(f); + f = fnext; + } +} + +static void remove_input_filters_below(ap_filter_t *f, ap_filter_type ftype) +{ + ap_filter_t *fnext; + + while (f && f->frec->ftype < ftype) { + fnext = f->next; + ap_remove_input_filter(f); + f = fnext; + } +} + static int h2_protocol_switch(conn_rec *c, request_rec *r, server_rec *s, const char *protocol) { @@ -155,11 +177,12 @@ static int h2_protocol_switch(conn_rec *c, request_rec *r, server_rec *s, /* Switching in the middle of a request means that * we have to send out the response to this one in h2 * format. So we need to take over the connection - * right away. + * and remove all old filters with type up to the + * CONNEDCTION/NETWORK ones. */ - ap_remove_input_filter_byhandle(r->input_filters, "http_in"); - ap_remove_output_filter_byhandle(r->output_filters, "HTTP_HEADER"); - + remove_input_filters_below(r->input_filters, AP_FTYPE_CONNECTION); + remove_output_filters_below(r->output_filters, AP_FTYPE_CONNECTION); + /* Ok, start an h2_conn on this one. */ status = h2_c1_setup(c, r, s); diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index 774122b9e4..ec0889d234 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -21,6 +21,7 @@ #include <mpm_common.h> #include <httpd.h> +#include <http_connection.h> #include <http_core.h> #include <http_log.h> #include <http_protocol.h> @@ -256,8 +257,41 @@ static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx) /* Get the next c2 from mplx to process. */ while (get_next(slot)) { - ap_assert(slot->connection != NULL); - h2_c2_process(slot->connection, thread, slot->id); + /* See the discussion at <https://github.com/icing/mod_h2/issues/195> + * + * Each conn_rec->id is supposed to be unique at a point in time. Since + * some modules (and maybe external code) uses this id as an identifier + * for the request_rec they handle, it needs to be unique for secondary + * connections also. + * + * The MPM module assigns the connection ids and mod_unique_id is using + * that one to generate identifier for requests. While the implementation + * works for HTTP/1.x, the parallel execution of several requests per + * connection will generate duplicate identifiers on load. + * + * The original implementation for secondary connection identifiers used + * to shift the master connection id up and assign the stream id to the + * lower bits. This was cramped on 32 bit systems, but on 64bit there was + * enough space. + * + * As issue 195 showed, mod_unique_id only uses the lower 32 bit of the + * connection id, even on 64bit systems. Therefore collisions in request ids. + * + * The way master connection ids are generated, there is some space "at the + * top" of the lower 32 bits on allmost all systems. If you have a setup + * with 64k threads per child and 255 child processes, you live on the edge. + * + * The new implementation shifts 8 bits and XORs in the worker + * id. This will experience collisions with > 256 h2 workers and heavy + * load still. There seems to be no way to solve this in all possible + * configurations by mod_h2 alone. + */ + AP_DEBUG_ASSERT(slot->connection != NULL); + slot->connection->id = (slot->connection->master->id << 8)^slot->id; + slot->connection->current_thread = thread; + + ap_process_connection(slot->connection, ap_get_conn_socket(slot->connection)); + h2_mplx_worker_c2_done(slot->connection); slot->connection = NULL; } |