diff options
author | Stefan Eissing <icing@apache.org> | 2016-10-03 11:47:45 +0000 |
---|---|---|
committer | Stefan Eissing <icing@apache.org> | 2016-10-03 11:47:45 +0000 |
commit | 2d12cf2d7a9635961cc3c46cfa7921da9c83d14c (patch) | |
tree | 0b2b0b626c7ad316414f65a1e44bdfbaf32d1104 | |
parent | 6099655506a8a862101d7ee3b9df3bdf399ebef1 (diff) | |
download | httpd-2d12cf2d7a9635961cc3c46cfa7921da9c83d14c.tar.gz |
various fixes, mod_cgid interop, response/trailer forwarding rewritten, stability
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1763158 13f79535-47bb-0310-9956-ffa450edef68
39 files changed, 1443 insertions, 1626 deletions
@@ -1,6 +1,14 @@ -*- coding: utf-8 -*- Changes with Apache 2.5.0 + *) mod_http2: rewrite of how responses and trailers are transferred between + master and slave connection. Reduction of internal states for tasks + and streams, stability. Heuristic id generation for slave connections + to better keep promise of connection ids unique at given point int time. + Fix for mod_cgid interop in high load situtations. + Fix for handling of incoming trailers when no request body is sent. + [Stefan Eissing] + *) event: Avoid listener periodic wake ups by using the pollset wake-ability when available. PR 57399. [Yann Ylavic, Luca Toscano] diff --git a/CMakeLists.txt b/CMakeLists.txt index bfd40cb869..446c1f4e71 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -434,7 +434,7 @@ SET(mod_http2_extra_sources modules/http2/h2_from_h1.c modules/http2/h2_h2.c modules/http2/h2_bucket_beam.c modules/http2/h2_mplx.c modules/http2/h2_push.c - modules/http2/h2_request.c modules/http2/h2_response.c + modules/http2/h2_request.c modules/http2/h2_headers.c modules/http2/h2_session.c modules/http2/h2_stream.c modules/http2/h2_switch.c modules/http2/h2_ngn_shed.c modules/http2/h2_task.c modules/http2/h2_util.c diff --git a/modules/http2/NWGNUmod_http2 b/modules/http2/NWGNUmod_http2 index 40f7181f25..2360e40f56 100644 --- a/modules/http2/NWGNUmod_http2 +++ b/modules/http2/NWGNUmod_http2 @@ -199,7 +199,7 @@ FILES_nlm_objs = \ $(OBJDIR)/h2_ngn_shed.o \ $(OBJDIR)/h2_push.o \ $(OBJDIR)/h2_request.o \ - $(OBJDIR)/h2_response.o \ + $(OBJDIR)/h2_headers.o \ $(OBJDIR)/h2_session.o \ $(OBJDIR)/h2_stream.o \ $(OBJDIR)/h2_switch.o \ diff --git a/modules/http2/config2.m4 b/modules/http2/config2.m4 index cccbbc8536..ceb183533d 100644 --- a/modules/http2/config2.m4 +++ b/modules/http2/config2.m4 @@ -30,11 +30,11 @@ h2_ctx.lo dnl h2_filter.lo dnl h2_from_h1.lo dnl h2_h2.lo dnl +h2_headers.lo dnl h2_mplx.lo dnl h2_ngn_shed.lo dnl h2_push.lo dnl h2_request.lo dnl -h2_response.lo dnl h2_session.lo dnl h2_stream.lo dnl h2_switch.lo dnl diff --git a/modules/http2/h2.h b/modules/http2/h2.h index b8eac1046c..03e6e3e623 100644 --- a/modules/http2/h2.h +++ b/modules/http2/h2.h @@ -47,6 +47,9 @@ extern const char *H2_MAGIC_TOKEN; #define H2_HEADER_PATH_LEN 5 #define H2_CRLF "\r\n" +/* Max data size to write so it fits inside a TLS record */ +#define H2_DATA_CHUNK_SIZE ((16*1024) - 100 - 9) + /* Maximum number of padding bytes in a frame, rfc7540 */ #define H2_MAX_PADLEN 256 /* Initial default window size, RFC 7540 ch. 6.5.2 */ @@ -115,38 +118,25 @@ typedef struct h2_session_props { typedef struct h2_request h2_request; struct h2_request { - apr_uint32_t id; /* stream id */ - apr_uint32_t initiated_on; /* initiating stream id (PUSH) or 0 */ - const char *method; /* pseudo header values, see ch. 8.1.2.3 */ const char *scheme; const char *authority; const char *path; apr_table_t *headers; - apr_table_t *trailers; apr_time_t request_time; - apr_off_t content_length; - unsigned int chunked : 1; /* iff requst body needs to be forwarded as chunked */ - unsigned int body : 1; /* iff this request has a body */ + unsigned int chunked : 1; /* iff requst body needs to be forwarded as chunked */ unsigned int serialize : 1; /* iff this request is written in HTTP/1.1 serialization */ - unsigned int push_policy; /* which push policy to use for this request */ }; -typedef struct h2_response h2_response; +typedef struct h2_headers h2_headers; -struct h2_response { - int stream_id; - int rst_error; - int http_status; - apr_off_t content_length; +struct h2_headers { + int status; apr_table_t *headers; - apr_table_t *trailers; - struct h2_response *next; - - const char *sos_filter; + apr_table_t *notes; }; typedef apr_status_t h2_io_data_cb(void *ctx, const char *data, apr_off_t len); @@ -155,7 +145,7 @@ typedef int h2_stream_pri_cmp(int stream_id1, int stream_id2, void *ctx); /* Note key to attach connection task id to conn_rec/request_rec instances */ -#define H2_TASK_ID_NOTE "http2-task-id" - +#define H2_TASK_ID_NOTE "http2-task-id" +#define H2_FILTER_DEBUG_NOTE "http2-debug" #endif /* defined(__mod_h2__h2__) */ diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index 01347363ee..7860d26170 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -21,6 +21,7 @@ #include <apr_thread_cond.h> #include <httpd.h> +#include <http_protocol.h> #include <http_log.h> #include "h2_private.h" @@ -170,6 +171,14 @@ const apr_bucket_type_t h2_bucket_type_beam = { * h2_blist, a brigade without allocations ******************************************************************************/ +APR_HOOK_STRUCT( + APR_HOOK_LINK(beam_bucket) +) +AP_IMPLEMENT_HOOK_RUN_FIRST(apr_bucket *, beam_bucket, + (h2_bucket_beam *beam, apr_bucket_brigade *dest, + const apr_bucket *src), + (beam, dest, src), NULL) + apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax, const char *tag, const char *sep, h2_blist *bl) @@ -518,10 +527,12 @@ void h2_beam_abort(h2_bucket_beam *beam) h2_beam_lock bl; if (enter_yellow(beam, &bl) == APR_SUCCESS) { - r_purge_reds(beam); - h2_blist_cleanup(&beam->red); - beam->aborted = 1; - report_consumption(beam, 0); + if (!beam->aborted) { + beam->aborted = 1; + r_purge_reds(beam); + h2_blist_cleanup(&beam->red); + report_consumption(beam, 0); + } if (beam->m_cond) { apr_thread_cond_broadcast(beam->m_cond); } @@ -792,8 +803,10 @@ transfer: else if (APR_BUCKET_IS_FLUSH(bred)) { bgreen = apr_bucket_flush_create(bb->bucket_alloc); } - else { - /* put red into hold, no green sent out */ + else if (AP_BUCKET_IS_ERROR(bred)) { + ap_bucket_error *eb = (ap_bucket_error *)bred; + bgreen = ap_bucket_error_create(eb->status, eb->data, + bb->p, bb->bucket_alloc); } } else if (APR_BUCKET_IS_FILE(bred)) { @@ -846,6 +859,14 @@ transfer: remain -= bgreen->length; ++transferred; } + else { + bgreen = ap_run_beam_bucket(beam, bb, bred); + while (bgreen && bgreen != APR_BRIGADE_SENTINEL(bb)) { + ++transferred; + remain -= bgreen->length; + bgreen = APR_BUCKET_NEXT(bgreen); + } + } } if (readbytes > 0 && remain < 0) { diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h index 1e486e9a24..db46baacd6 100644 --- a/modules/http2/h2_bucket_beam.h +++ b/modules/http2/h2_bucket_beam.h @@ -373,4 +373,8 @@ int h2_beam_was_received(h2_bucket_beam *beam); apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam); +AP_DECLARE_HOOK(apr_bucket *, beam_bucket, + (h2_bucket_beam *beam, apr_bucket_brigade *dest, + const apr_bucket *src)) + #endif /* h2_bucket_beam_h */ diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index 4ddf1b7029..c2e8e0ef34 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -14,6 +14,7 @@ */ #include <assert.h> +#include <apr_strings.h> #include <ap_mpm.h> @@ -240,12 +241,13 @@ apr_status_t h2_conn_pre_close(struct h2_ctx *ctx, conn_rec *c) return status; } -conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent, - apr_allocator_t *allocator) +conn_rec *h2_slave_create(conn_rec *master, apr_uint32_t slave_id, + apr_pool_t *parent, apr_allocator_t *allocator) { apr_pool_t *pool; conn_rec *c; void *cfg; + unsigned long l; AP_DEBUG_ASSERT(master); ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, master, @@ -271,8 +273,29 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent, } memcpy(c, master, sizeof(conn_rec)); - - /* Replace these */ + + /* Each conn_rec->id is supposed to be unique at a point in time. Since + * some modules (and maybe external code) uses this id as an identifier + * for the request_rec they handle, it needs to be unique for slave + * connections also. + * The connection id is generated by the MPM and most MPMs use the formula + * id := (child_num * max_threads) + thread_num + * which means that there is a maximum id of about + * idmax := max_child_count * max_threads + * If we assume 2024 child processes with 2048 threads max, we get + * idmax ~= 2024 * 2048 = 2 ** 22 + * On 32 bit systems, we have not much space left, but on 64 bit systems + * (and higher?) we can use the upper 32 bits without fear of collision. + * 32 bits is just what we need, since a connection can only handle so + * many streams. + */ + l = master->id; + if (sizeof(long) >= 8 && l < APR_UINT32_MAX) { + c->id = l|(((unsigned long)slave_id) << 32); + } + else { + c->id = l^(~slave_id); + } c->master = master; c->pool = pool; c->conn_config = ap_create_conn_config(pool); @@ -284,7 +307,8 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent, c->data_in_output_filters = 0; c->clogging_input_filters = 1; c->log = NULL; - c->log_id = NULL; + c->log_id = apr_psprintf(pool, "%ld-%d", + master->id, slave_id); /* Simulate that we had already a request on this connection. */ c->keepalives = 1; /* We cannot install the master connection socket on the slaves, as @@ -304,6 +328,9 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent, ap_set_module_config(c->conn_config, h2_conn_mpm_module(), cfg); } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, + "h2_task: creating conn, master=%ld, sid=%ld, logid=%s", + master->id, c->id, c->log_id); return c; } diff --git a/modules/http2/h2_conn.h b/modules/http2/h2_conn.h index 84f7616858..4c2799696a 100644 --- a/modules/http2/h2_conn.h +++ b/modules/http2/h2_conn.h @@ -66,8 +66,8 @@ typedef enum { h2_mpm_type_t h2_conn_mpm_type(void); -conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent, - apr_allocator_t *allocator); +conn_rec *h2_slave_create(conn_rec *master, apr_uint32_t slave_id, + apr_pool_t *parent, apr_allocator_t *allocator); void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator); apr_status_t h2_slave_run_pre_connection(conn_rec *slave, apr_socket_t *csd); diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c index d72cbe4967..6ba24faa6f 100644 --- a/modules/http2/h2_conn_io.c +++ b/modules/http2/h2_conn_io.c @@ -120,8 +120,8 @@ static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level, line = *buffer? buffer : "(empty)"; } /* Intentional no APLOGNO */ - ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d)-%s: %s", - c->id, stream_id, tag, line); + ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%s)-%s: %s", + c->log_id, tag, line); } diff --git a/modules/http2/h2_filter.c b/modules/http2/h2_filter.c index ce94b52ed6..6d9cee16f5 100644 --- a/modules/http2/h2_filter.c +++ b/modules/http2/h2_filter.c @@ -18,6 +18,7 @@ #include <apr_strings.h> #include <httpd.h> #include <http_core.h> +#include <http_protocol.h> #include <http_log.h> #include <http_connection.h> #include <scoreboard.h> @@ -32,7 +33,7 @@ #include "h2_task.h" #include "h2_stream.h" #include "h2_request.h" -#include "h2_response.h" +#include "h2_headers.h" #include "h2_stream.h" #include "h2_session.h" #include "h2_util.h" @@ -174,30 +175,92 @@ apr_status_t h2_filter_core_input(ap_filter_t* f, * http2 connection status handler + stream out source ******************************************************************************/ -static const char *H2_SOS_H2_STATUS = "http2-status"; +typedef struct { + apr_bucket_refcount refcount; + h2_bucket_event_cb *cb; + void *ctx; +} h2_bucket_observer; + +static apr_status_t bucket_read(apr_bucket *b, const char **str, + apr_size_t *len, apr_read_type_e block) +{ + (void)b; + (void)block; + *str = NULL; + *len = 0; + return APR_SUCCESS; +} -int h2_filter_h2_status_handler(request_rec *r) +static void bucket_destroy(void *data) { - h2_ctx *ctx = h2_ctx_rget(r); - h2_task *task; - - if (strcmp(r->handler, "http2-status")) { - return DECLINED; + h2_bucket_observer *h = data; + if (apr_bucket_shared_destroy(h)) { + if (h->cb) { + h->cb(h->ctx, H2_BUCKET_EV_BEFORE_DESTROY, NULL); + } + apr_bucket_free(h); } - if (r->method_number != M_GET) { - return DECLINED; +} + +apr_bucket * h2_bucket_observer_make(apr_bucket *b, h2_bucket_event_cb *cb, + void *ctx) +{ + h2_bucket_observer *br; + + br = apr_bucket_alloc(sizeof(*br), b->list); + br->cb = cb; + br->ctx = ctx; + + b = apr_bucket_shared_make(b, br, 0, 0); + b->type = &h2_bucket_type_observer; + return b; +} + +apr_bucket * h2_bucket_observer_create(apr_bucket_alloc_t *list, + h2_bucket_event_cb *cb, void *ctx) +{ + apr_bucket *b = apr_bucket_alloc(sizeof(*b), list); + + APR_BUCKET_INIT(b); + b->free = apr_bucket_free; + b->list = list; + b = h2_bucket_observer_make(b, cb, ctx); + return b; +} + +apr_status_t h2_bucket_observer_fire(apr_bucket *b, h2_bucket_event event) +{ + if (H2_BUCKET_IS_OBSERVER(b)) { + h2_bucket_observer *l = (h2_bucket_observer *)b->data; + return l->cb(l->ctx, event, b); } + return APR_EINVAL; +} - task = ctx? h2_ctx_get_task(ctx) : NULL; - if (task) { - /* We need to handle the actual output on the main thread, as - * we need to access h2_session information. */ - apr_table_setn(r->notes, H2_RESP_SOS_NOTE, H2_SOS_H2_STATUS); - apr_table_setn(r->headers_out, "Content-Type", "application/json"); - r->status = 200; - return DONE; +const apr_bucket_type_t h2_bucket_type_observer = { + "H2LAZY", 5, APR_BUCKET_METADATA, + bucket_destroy, + bucket_read, + apr_bucket_setaside_noop, + apr_bucket_split_notimpl, + apr_bucket_shared_copy +}; + +apr_bucket *h2_bucket_observer_beam(struct h2_bucket_beam *beam, + apr_bucket_brigade *dest, + const apr_bucket *src) +{ + if (H2_BUCKET_IS_OBSERVER(src)) { + h2_bucket_observer *l = (h2_bucket_observer *)src->data; + apr_bucket *b = h2_bucket_observer_create(dest->bucket_alloc, + l->cb, l->ctx); + APR_BRIGADE_INSERT_TAIL(dest, b); + l->cb = NULL; + l->ctx = NULL; + h2_bucket_observer_fire(b, H2_BUCKET_EV_BEFORE_MASTER_SEND); + return b; } - return DECLINED; + return NULL; } static apr_status_t bbout(apr_bucket_brigade *bb, const char *fmt, ...) @@ -337,31 +400,28 @@ static void add_stats(apr_bucket_brigade *bb, h2_session *s, bbout(bb, " }%s\n", last? "" : ","); } -static apr_status_t h2_status_stream_filter(h2_stream *stream) +static apr_status_t h2_status_insert(h2_task *task, apr_bucket *b) { - h2_session *s = stream->session; - conn_rec *c = s->c; + h2_mplx *m = task->mplx; + h2_stream *stream = h2_mplx_stream_get(m, task->stream_id); + h2_session *s; + conn_rec *c; + apr_bucket_brigade *bb; + apr_bucket *e; int32_t connFlowIn, connFlowOut; - if (!stream->response) { - return APR_EINVAL; - } - - if (!stream->buffer) { - stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc); + if (!stream) { + /* stream already done */ + return APR_SUCCESS; } - bb = stream->buffer; + s = stream->session; + c = s->c; - apr_table_unset(stream->response->headers, "Content-Length"); - stream->response->content_length = -1; + bb = apr_brigade_create(stream->pool, c->bucket_alloc); connFlowIn = nghttp2_session_get_effective_local_window_size(s->ngh2); connFlowOut = nghttp2_session_get_remote_window_size(s->ngh2); - apr_table_setn(stream->response->headers, "conn-flow-in", - apr_itoa(stream->pool, connFlowIn)); - apr_table_setn(stream->response->headers, "conn-flow-out", - apr_itoa(stream->pool, connFlowOut)); bbout(bb, "{\n"); bbout(bb, " \"version\": \"draft-01\",\n"); @@ -376,15 +436,96 @@ static apr_status_t h2_status_stream_filter(h2_stream *stream) add_stats(bb, s, stream, 1); bbout(bb, "}\n"); + while ((e = APR_BRIGADE_FIRST(bb)) != APR_BRIGADE_SENTINEL(bb)) { + APR_BUCKET_REMOVE(e); + APR_BUCKET_INSERT_AFTER(b, e); + b = e; + } + apr_brigade_destroy(bb); + return APR_SUCCESS; } -apr_status_t h2_stream_filter(h2_stream *stream) +static apr_status_t status_event(void *ctx, h2_bucket_event event, + apr_bucket *b) { - const char *fname = stream->response? stream->response->sos_filter : NULL; - if (fname && !strcmp(H2_SOS_H2_STATUS, fname)) { - return h2_status_stream_filter(stream); + h2_task *task = ctx; + + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, task->c->master, + "status_event(%s): %d", task->id, event); + switch (event) { + case H2_BUCKET_EV_BEFORE_MASTER_SEND: + h2_status_insert(task, b); + break; + default: + break; } return APR_SUCCESS; } +int h2_filter_h2_status_handler(request_rec *r) +{ + h2_ctx *ctx = h2_ctx_rget(r); + conn_rec *c = r->connection; + h2_task *task; + apr_bucket_brigade *bb; + apr_bucket *b; + apr_status_t status; + + if (strcmp(r->handler, "http2-status")) { + return DECLINED; + } + if (r->method_number != M_GET && r->method_number != M_POST) { + return DECLINED; + } + + task = ctx? h2_ctx_get_task(ctx) : NULL; + if (task) { + + if ((status = ap_discard_request_body(r)) != OK) { + return status; + } + + /* We need to handle the actual output on the main thread, as + * we need to access h2_session information. */ + r->status = 200; + r->clength = -1; + r->chunked = 1; + apr_table_unset(r->headers_out, "Content-Length"); + ap_set_content_type(r, "application/json"); + apr_table_setn(r->notes, H2_FILTER_DEBUG_NOTE, "on"); + + bb = apr_brigade_create(r->pool, c->bucket_alloc); + b = h2_bucket_observer_create(c->bucket_alloc, status_event, task); + APR_BRIGADE_INSERT_TAIL(bb, b); + b = apr_bucket_eos_create(c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, b); + + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, + "status_handler(%s): checking for incoming trailers", + task->id); + if (r->trailers_in && !apr_is_empty_table(r->trailers_in)) { + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, + "status_handler(%s): seeing incoming trailers", + task->id); + apr_table_setn(r->trailers_out, "h2-trailers-in", + apr_itoa(r->pool, 1)); + } + + status = ap_pass_brigade(r->output_filters, bb); + if (status == APR_SUCCESS + || r->status != HTTP_OK + || c->aborted) { + return OK; + } + else { + /* no way to know what type of error occurred */ + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, + "status_handler(%s): ap_pass_brigade failed", + task->id); + return AP_FILTER_ERROR; + } + } + return DECLINED; +} + diff --git a/modules/http2/h2_filter.h b/modules/http2/h2_filter.h index 5ba7d1581b..b3e34cc5ba 100644 --- a/modules/http2/h2_filter.h +++ b/modules/http2/h2_filter.h @@ -16,6 +16,8 @@ #ifndef __mod_h2__h2_filter__ #define __mod_h2__h2_filter__ +struct h2_bucket_beam; +struct h2_headers; struct h2_stream; struct h2_session; @@ -43,9 +45,33 @@ apr_status_t h2_filter_core_input(ap_filter_t* filter, apr_read_type_e block, apr_off_t readbytes); -#define H2_RESP_SOS_NOTE "h2-sos-filter" +/******* observer bucket ******************************************************/ + +typedef enum { + H2_BUCKET_EV_BEFORE_DESTROY, + H2_BUCKET_EV_BEFORE_MASTER_SEND +} h2_bucket_event; + +extern const apr_bucket_type_t h2_bucket_type_observer; + +typedef apr_status_t h2_bucket_event_cb(void *ctx, h2_bucket_event event, apr_bucket *b); + +#define H2_BUCKET_IS_OBSERVER(e) (e->type == &h2_bucket_type_observer) + +apr_bucket * h2_bucket_observer_make(apr_bucket *b, h2_bucket_event_cb *cb, + void *ctx); + +apr_bucket * h2_bucket_observer_create(apr_bucket_alloc_t *list, + h2_bucket_event_cb *cb, void *ctx); + +apr_status_t h2_bucket_observer_fire(apr_bucket *b, h2_bucket_event event); + +apr_bucket *h2_bucket_observer_beam(struct h2_bucket_beam *beam, + apr_bucket_brigade *dest, + const apr_bucket *src); + +/******* /.well-known/h2/state handler ****************************************/ -apr_status_t h2_stream_filter(struct h2_stream *stream); int h2_filter_h2_status_handler(request_rec *r); #endif /* __mod_h2__h2_filter__ */ diff --git a/modules/http2/h2_from_h1.c b/modules/http2/h2_from_h1.c index 876ec58bfb..b7429dc7d4 100644 --- a/modules/http2/h2_from_h1.c +++ b/modules/http2/h2_from_h1.c @@ -28,190 +28,12 @@ #include <util_time.h> #include "h2_private.h" -#include "h2_response.h" +#include "h2_headers.h" #include "h2_from_h1.h" #include "h2_task.h" #include "h2_util.h" -static void set_state(h2_from_h1 *from_h1, h2_from_h1_state_t state); - -h2_from_h1 *h2_from_h1_create(int stream_id, apr_pool_t *pool) -{ - h2_from_h1 *from_h1 = apr_pcalloc(pool, sizeof(h2_from_h1)); - if (from_h1) { - from_h1->stream_id = stream_id; - from_h1->pool = pool; - from_h1->state = H2_RESP_ST_STATUS_LINE; - from_h1->hlines = apr_array_make(pool, 10, sizeof(char *)); - } - return from_h1; -} - -static void set_state(h2_from_h1 *from_h1, h2_from_h1_state_t state) -{ - if (from_h1->state != state) { - from_h1->state = state; - } -} - -h2_response *h2_from_h1_get_response(h2_from_h1 *from_h1) -{ - return from_h1->response; -} - -static apr_status_t make_h2_headers(h2_from_h1 *from_h1, request_rec *r) -{ - from_h1->response = h2_response_create(from_h1->stream_id, 0, - from_h1->http_status, - from_h1->hlines, - r->notes, - from_h1->pool); - from_h1->content_length = from_h1->response->content_length; - from_h1->chunked = r->chunked; - - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection, APLOGNO(03197) - "h2_from_h1(%d): converted headers, content-length: %d" - ", chunked=%d", - from_h1->stream_id, (int)from_h1->content_length, - (int)from_h1->chunked); - - set_state(from_h1, ((from_h1->chunked || from_h1->content_length > 0)? - H2_RESP_ST_BODY : H2_RESP_ST_DONE)); - /* We are ready to be sent to the client */ - return APR_SUCCESS; -} - -static apr_status_t parse_header(h2_from_h1 *from_h1, ap_filter_t* f, - char *line) { - (void)f; - - if (line[0] == ' ' || line[0] == '\t') { - char **plast; - /* continuation line from the header before this */ - while (line[0] == ' ' || line[0] == '\t') { - ++line; - } - - plast = apr_array_pop(from_h1->hlines); - if (plast == NULL) { - /* not well formed */ - return APR_EINVAL; - } - APR_ARRAY_PUSH(from_h1->hlines, const char*) = apr_psprintf(from_h1->pool, "%s %s", *plast, line); - } - else { - /* new header line */ - APR_ARRAY_PUSH(from_h1->hlines, const char*) = apr_pstrdup(from_h1->pool, line); - } - return APR_SUCCESS; -} - -static apr_status_t get_line(h2_from_h1 *from_h1, apr_bucket_brigade *bb, - ap_filter_t* f, char *line, apr_size_t len) -{ - apr_status_t status; - if (!from_h1->bb) { - from_h1->bb = apr_brigade_create(from_h1->pool, f->c->bucket_alloc); - } - else { - apr_brigade_cleanup(from_h1->bb); - } - status = apr_brigade_split_line(from_h1->bb, bb, - APR_BLOCK_READ, - HUGE_STRING_LEN); - if (status == APR_SUCCESS) { - --len; - status = apr_brigade_flatten(from_h1->bb, line, &len); - if (status == APR_SUCCESS) { - /* we assume a non-0 containing line and remove - * trailing crlf. */ - line[len] = '\0'; - if (len >= 2 && !strcmp(H2_CRLF, line + len - 2)) { - len -= 2; - line[len] = '\0'; - } - - apr_brigade_cleanup(from_h1->bb); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_from_h1(%d): read line: %s", - from_h1->stream_id, line); - } - } - return status; -} - -apr_status_t h2_from_h1_read_response(h2_from_h1 *from_h1, ap_filter_t* f, - apr_bucket_brigade* bb) -{ - apr_status_t status = APR_SUCCESS; - char line[HUGE_STRING_LEN]; - - if ((from_h1->state == H2_RESP_ST_BODY) - || (from_h1->state == H2_RESP_ST_DONE)) { - if (from_h1->chunked) { - /* The httpd core HTTP_HEADER filter has or will install the - * "CHUNK" output transcode filter, which appears further down - * the filter chain. We do not want it for HTTP/2. - * Once we successfully deinstalled it, this filter has no - * further function and we remove it. - */ - status = ap_remove_output_filter_byhandle(f->r->output_filters, - "CHUNK"); - if (status == APR_SUCCESS) { - ap_remove_output_filter(f); - } - } - - return ap_pass_brigade(f->next, bb); - } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_from_h1(%d): read_response", from_h1->stream_id); - - while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) { - - switch (from_h1->state) { - - case H2_RESP_ST_STATUS_LINE: - case H2_RESP_ST_HEADERS: - status = get_line(from_h1, bb, f, line, sizeof(line)); - if (status != APR_SUCCESS) { - return status; - } - if (from_h1->state == H2_RESP_ST_STATUS_LINE) { - /* instead of parsing, just take it directly */ - from_h1->http_status = f->r->status; - from_h1->state = H2_RESP_ST_HEADERS; - } - else if (line[0] == '\0') { - /* end of headers, create the h2_response and - * pass the rest of the brigade down the filter - * chain. - */ - status = make_h2_headers(from_h1, f->r); - if (from_h1->bb) { - apr_brigade_destroy(from_h1->bb); - from_h1->bb = NULL; - } - if (!APR_BRIGADE_EMPTY(bb)) { - return ap_pass_brigade(f->next, bb); - } - } - else { - status = parse_header(from_h1, f, line); - } - break; - - default: - return ap_pass_brigade(f->next, bb); - } - - } - - return status; -} - /* This routine is called by apr_table_do and merges all instances of * the passed field values into a single array that will be further * processed by some later routine. Originally intended to help split @@ -345,7 +167,7 @@ static int copy_header(void *ctx, const char *name, const char *value) return 1; } -static h2_response *create_response(h2_from_h1 *from_h1, request_rec *r) +static h2_headers *create_response(h2_task *task, request_rec *r) { const char *clheader; const char *ctype; @@ -471,115 +293,316 @@ static h2_response *create_response(h2_from_h1 *from_h1, request_rec *r) (void *) headers, r->headers_out, NULL); } - return h2_response_rcreate(from_h1->stream_id, r, r->status, - headers, r->pool); + return h2_headers_rcreate(r, r->status, headers, r->pool); } -apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb) +apr_status_t h2_headers_output_filter(ap_filter_t *f, apr_bucket_brigade *bb) { h2_task *task = f->ctx; - h2_from_h1 *from_h1 = task->output.from_h1; request_rec *r = f->r; - apr_bucket *b; + apr_bucket *b, *bresp, *body_bucket = NULL, *next; ap_bucket_error *eb = NULL; + h2_headers *response = NULL; - AP_DEBUG_ASSERT(from_h1 != NULL); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_from_h1(%d): output_filter called", from_h1->stream_id); + "h2_task(%s): output_filter called", task->id); - if (r->header_only && from_h1->response) { - /* throw away any data after we have compiled the response */ - apr_brigade_cleanup(bb); - return OK; + if (!task->output.sent_response) { + /* check, if we need to send the response now. Until we actually + * see a DATA bucket or some EOS/EOR, we do not do so. */ + for (b = APR_BRIGADE_FIRST(bb); + b != APR_BRIGADE_SENTINEL(bb); + b = APR_BUCKET_NEXT(b)) + { + if (AP_BUCKET_IS_ERROR(b) && !eb) { + eb = b->data; + } + else if (AP_BUCKET_IS_EOC(b)) { + /* If we see an EOC bucket it is a signal that we should get out + * of the way doing nothing. + */ + ap_remove_output_filter(f); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, f->c, + "h2_task(%s): eoc bucket passed", task->id); + return ap_pass_brigade(f->next, bb); + } + else if (!H2_BUCKET_IS_HEADERS(b) && !APR_BUCKET_IS_FLUSH(b)) { + body_bucket = b; + break; + } + } + + if (eb) { + int st = eb->status; + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03047) + "h2_task(%s): err bucket status=%d", task->id, st); + /* throw everything away and replace it with the error response + * generated by ap_die() */ + apr_brigade_cleanup(bb); + ap_die(st, r); + return AP_FILTER_ERROR; + } + + if (body_bucket) { + /* time to insert the response bucket before the body */ + response = create_response(task, r); + if (response == NULL) { + ap_log_cerror(APLOG_MARK, APLOG_NOTICE, 0, f->c, APLOGNO(03048) + "h2_task(%s): unable to create response", task->id); + return APR_ENOMEM; + } + + bresp = h2_bucket_headers_create(f->c->bucket_alloc, response); + APR_BUCKET_INSERT_BEFORE(body_bucket, bresp); + /*APR_BRIGADE_INSERT_HEAD(bb, bresp);*/ + task->output.sent_response = 1; + r->sent_bodyct = 1; + } } - for (b = APR_BRIGADE_FIRST(bb); - b != APR_BRIGADE_SENTINEL(bb); - b = APR_BUCKET_NEXT(b)) - { - if (AP_BUCKET_IS_ERROR(b) && !eb) { - eb = b->data; - continue; + if (r->header_only) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, + "h2_task(%s): header_only, cleanup output brigade", + task->id); + b = body_bucket? body_bucket : APR_BRIGADE_FIRST(bb); + while (b != APR_BRIGADE_SENTINEL(bb)) { + next = APR_BUCKET_NEXT(b); + if (APR_BUCKET_IS_EOS(b) || AP_BUCKET_IS_EOR(b)) { + break; + } + APR_BUCKET_REMOVE(b); + apr_bucket_destroy(b); + b = next; } - /* - * If we see an EOC bucket it is a signal that we should get out - * of the way doing nothing. - */ - if (AP_BUCKET_IS_EOC(b)) { - ap_remove_output_filter(f); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, f->c, - "h2_from_h1(%d): eoc bucket passed", - from_h1->stream_id); - return ap_pass_brigade(f->next, bb); + } + else if (task->output.sent_response) { + /* lets get out of the way, our task is done */ + ap_remove_output_filter(f); + } + return ap_pass_brigade(f->next, bb); +} + +static void make_chunk(h2_task *task, apr_bucket_brigade *bb, + apr_bucket *first, apr_uint64_t chunk_len, + apr_bucket *tail) +{ + /* Surround the buckets [first, tail[ with new buckets carrying the + * HTTP/1.1 chunked encoding format. If tail is NULL, the chunk extends + * to the end of the brigade. */ + char buffer[128]; + apr_bucket *c; + int len; + + len = apr_snprintf(buffer, H2_ALEN(buffer), + "%"APR_UINT64_T_HEX_FMT"\r\n", chunk_len); + c = apr_bucket_heap_create(buffer, len, NULL, bb->bucket_alloc); + APR_BUCKET_INSERT_BEFORE(first, c); + c = apr_bucket_heap_create("\r\n", 2, NULL, bb->bucket_alloc); + if (tail) { + APR_BUCKET_INSERT_BEFORE(tail, c); + } + else { + APR_BRIGADE_INSERT_TAIL(bb, c); + } +} + +static int ser_header(void *ctx, const char *name, const char *value) +{ + apr_bucket_brigade *bb = ctx; + apr_brigade_printf(bb, NULL, NULL, "%s: %s\r\n", name, value); + return 1; +} + +apr_status_t h2_filter_request_in(ap_filter_t* f, + apr_bucket_brigade* bb, + ap_input_mode_t mode, + apr_read_type_e block, + apr_off_t readbytes) +{ + h2_task *task = f->ctx; + request_rec *r = f->r; + apr_status_t status = APR_SUCCESS; + apr_bucket *b, *next, *first_data = NULL; + apr_off_t bblen = 0; + + if (!task->input.chunked) { + status = ap_get_brigade(f->next, bb, mode, block, readbytes); + /* pipe data through, just take care of trailers */ + for (b = APR_BRIGADE_FIRST(bb); + b != APR_BRIGADE_SENTINEL(bb); b = next) { + next = APR_BUCKET_NEXT(b); + if (H2_BUCKET_IS_HEADERS(b)) { + h2_headers *headers = h2_bucket_headers_get(b); + ap_assert(headers); + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, + "h2_task(%s): receiving trailers", task->id); + r->trailers_in = apr_table_clone(r->pool, headers->headers); + APR_BUCKET_REMOVE(b); + apr_bucket_destroy(b); + ap_remove_input_filter(f); + break; + } } + return status; } + + /* Things are more complicated. The standard HTTP input filter, which + * does a lot what we do not want to duplicate, also cares about chunked + * transfer encoding and trailers. + * We need to simulate chunked encoding for it to be happy. + */ - if (eb) { - int st = eb->status; - apr_brigade_cleanup(bb); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03047) - "h2_from_h1(%d): err bucket status=%d", - from_h1->stream_id, st); - ap_die(st, r); - return AP_FILTER_ERROR; + if (!task->input.bbchunk) { + task->input.bbchunk = apr_brigade_create(r->pool, f->c->bucket_alloc); } - - from_h1->response = create_response(from_h1, r); - if (from_h1->response == NULL) { - ap_log_cerror(APLOG_MARK, APLOG_NOTICE, 0, f->c, APLOGNO(03048) - "h2_from_h1(%d): unable to create response", - from_h1->stream_id); - return APR_ENOMEM; + if (APR_BRIGADE_EMPTY(task->input.bbchunk)) { + /* get more data from the lower layer filters. Always do this + * in larger pieces, since we handle the read modes ourself. + */ + status = ap_get_brigade(f->next, task->input.bbchunk, + AP_MODE_READBYTES, block, 32*1024); + if (status == APR_EOF) { + if (!task->input.eos) { + status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n"); + task->input.eos = 1; + return APR_SUCCESS; + } + ap_remove_input_filter(f); + return status; + + } + else if (status != APR_SUCCESS) { + return status; + } + + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, + "h2_task(%s): trailers_in inspecting brigade", task->id); + for (b = APR_BRIGADE_FIRST(task->input.bbchunk); + b != APR_BRIGADE_SENTINEL(task->input.bbchunk) && !task->input.eos; + b = next) { + next = APR_BUCKET_NEXT(b); + if (APR_BUCKET_IS_METADATA(b)) { + if (first_data) { + make_chunk(task, task->input.bbchunk, first_data, bblen, b); + first_data = NULL; + bblen = 0; + } + + if (H2_BUCKET_IS_HEADERS(b)) { + apr_bucket_brigade *tmp; + h2_headers *headers = h2_bucket_headers_get(b); + + ap_assert(headers); + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, + "h2_task(%s): receiving trailers", task->id); + tmp = apr_brigade_split_ex(task->input.bbchunk, b, NULL); + if (!apr_is_empty_table(headers->headers)) { + status = apr_brigade_puts(task->input.bbchunk, NULL, NULL, "0\r\n"); + apr_table_do(ser_header, task->input.bbchunk, headers->headers, NULL); + status = apr_brigade_puts(task->input.bbchunk, NULL, NULL, "\r\n"); + } + else { + status = apr_brigade_puts(task->input.bbchunk, NULL, NULL, "0\r\n\r\n"); + } + APR_BRIGADE_CONCAT(task->input.bbchunk, tmp); + apr_brigade_destroy(tmp); + r->trailers_in = apr_table_clone(r->pool, headers->headers); + APR_BUCKET_REMOVE(b); + apr_bucket_destroy(b); + task->input.eos = 1; + } + else if (APR_BUCKET_IS_EOS(b)) { + apr_bucket_brigade *tmp = apr_brigade_split_ex(task->input.bbchunk, b, NULL); + status = apr_brigade_puts(task->input.bbchunk, NULL, NULL, "0\r\n\r\n"); + APR_BRIGADE_CONCAT(task->input.bbchunk, tmp); + apr_brigade_destroy(tmp); + task->input.eos = 1; + } + break; + } + else if (b->length == 0) { + APR_BUCKET_REMOVE(b); + apr_bucket_destroy(b); + } + else { + if (!first_data) { + first_data = b; + } + bblen += b->length; + } + } + + if (first_data) { + make_chunk(task, task->input.bbchunk, first_data, bblen, NULL); + } + } + + if (mode == AP_MODE_EXHAUSTIVE) { + /* return all we have */ + APR_BRIGADE_CONCAT(bb, task->input.bbchunk); + } + else if (mode == AP_MODE_READBYTES) { + status = h2_brigade_concat_length(bb, task->input.bbchunk, readbytes); + } + else if (mode == AP_MODE_SPECULATIVE) { + status = h2_brigade_copy_length(bb, task->input.bbchunk, readbytes); + } + else if (mode == AP_MODE_GETLINE) { + /* we are reading a single LF line, e.g. the HTTP headers. + * this has the nasty side effect to split the bucket, even + * though it ends with CRLF and creates a 0 length bucket */ + status = apr_brigade_split_line(bb, task->input.bbchunk, block, + HUGE_STRING_LEN); + if (APLOGctrace1(f->c)) { + char buffer[1024]; + apr_size_t len = sizeof(buffer)-1; + apr_brigade_flatten(bb, buffer, &len); + buffer[len] = 0; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, + "h2_task(%s): getline: %s", + task->id, buffer); + } } - - if (r->header_only) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_from_h1(%d): header_only, cleanup output brigade", - from_h1->stream_id); - apr_brigade_cleanup(bb); - return OK; + else { + /* Hmm, well. There is mode AP_MODE_EATCRLF, but we chose not + * to support it. Seems to work. */ + ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOTIMPL, f->c, + APLOGNO(02942) + "h2_task, unsupported READ mode %d", mode); + status = APR_ENOTIMPL; } - r->sent_bodyct = 1; /* Whatever follows is real body stuff... */ - - ap_remove_output_filter(f); - if (APLOGctrace1(f->c)) { - apr_off_t len = 0; - apr_brigade_length(bb, 0, &len); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_from_h1(%d): removed header filter, passing brigade " - "len=%ld", from_h1->stream_id, (long)len); - } - return ap_pass_brigade(f->next, bb); + h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, "forwarding input", bb); + return status; } -apr_status_t h2_response_trailers_filter(ap_filter_t *f, apr_bucket_brigade *bb) +apr_status_t h2_filter_trailers_out(ap_filter_t *f, apr_bucket_brigade *bb) { h2_task *task = f->ctx; - h2_from_h1 *from_h1 = task->output.from_h1; request_rec *r = f->r; - apr_bucket *b; + apr_bucket *b, *e; - if (from_h1 && from_h1->response) { - /* Detect the EOR bucket and forward any trailers that may have - * been set to our h2_response. + if (task && r) { + /* Detect the EOS/EOR bucket and forward any trailers that may have + * been set to our h2_headers. */ for (b = APR_BRIGADE_FIRST(bb); b != APR_BRIGADE_SENTINEL(bb); b = APR_BUCKET_NEXT(b)) { - if (AP_BUCKET_IS_EOR(b)) { - /* FIXME: need a better test case than this. - apr_table_setn(r->trailers_out, "X", "1"); */ - if (r->trailers_out && !apr_is_empty_table(r->trailers_out)) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03049) - "h2_from_h1(%d): trailers filter, saving trailers", - from_h1->stream_id); - h2_response_set_trailers(from_h1->response, - apr_table_clone(from_h1->pool, - r->trailers_out)); - } + if ((APR_BUCKET_IS_EOS(b) || AP_BUCKET_IS_EOR(b)) + && r->trailers_out && !apr_is_empty_table(r->trailers_out)) { + h2_headers *headers; + apr_table_t *trailers; + + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03049) + "h2_task(%s): sending trailers", task->id); + trailers = apr_table_clone(r->pool, r->trailers_out); + headers = h2_headers_rcreate(r, HTTP_OK, trailers, r->pool); + e = h2_bucket_headers_create(bb->bucket_alloc, headers); + APR_BUCKET_INSERT_BEFORE(b, e); + apr_table_clear(r->trailers_out); + ap_remove_output_filter(f); break; } } diff --git a/modules/http2/h2_from_h1.h b/modules/http2/h2_from_h1.h index 71cc35faa9..9215539668 100644 --- a/modules/http2/h2_from_h1.h +++ b/modules/http2/h2_from_h1.h @@ -30,44 +30,18 @@ * we need to have all handlers and filters involved in request/response * processing, so this seems to be the way for now. */ +struct h2_headers; +struct h2_task; -typedef enum { - H2_RESP_ST_STATUS_LINE, /* parsing http/1 status line */ - H2_RESP_ST_HEADERS, /* parsing http/1 response headers */ - H2_RESP_ST_BODY, /* transferring response body */ - H2_RESP_ST_DONE /* complete response converted */ -} h2_from_h1_state_t; +apr_status_t h2_headers_output_filter(ap_filter_t *f, apr_bucket_brigade *bb); -struct h2_response; +apr_status_t h2_filter_request_in(ap_filter_t* f, + apr_bucket_brigade* brigade, + ap_input_mode_t mode, + apr_read_type_e block, + apr_off_t readbytes); -typedef struct h2_from_h1 h2_from_h1; - -struct h2_from_h1 { - int stream_id; - h2_from_h1_state_t state; - apr_pool_t *pool; - apr_bucket_brigade *bb; - - apr_off_t content_length; - int chunked; - - int http_status; - apr_array_header_t *hlines; - - struct h2_response *response; -}; - - -h2_from_h1 *h2_from_h1_create(int stream_id, apr_pool_t *pool); - -apr_status_t h2_from_h1_read_response(h2_from_h1 *from_h1, - ap_filter_t* f, apr_bucket_brigade* bb); - -struct h2_response *h2_from_h1_get_response(h2_from_h1 *from_h1); - -apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb); - -apr_status_t h2_response_trailers_filter(ap_filter_t *f, apr_bucket_brigade *bb); +apr_status_t h2_filter_trailers_out(ap_filter_t *f, apr_bucket_brigade *bb); void h2_from_h1_set_basic_http_header(apr_table_t *headers, request_rec *r, apr_pool_t *pool); diff --git a/modules/http2/h2_h2.c b/modules/http2/h2_h2.c index fd5338406b..80e1afadfb 100644 --- a/modules/http2/h2_h2.c +++ b/modules/http2/h2_h2.c @@ -32,12 +32,15 @@ #include "mod_http2.h" #include "h2_private.h" +#include "h2_bucket_beam.h" #include "h2_stream.h" #include "h2_task.h" #include "h2_config.h" #include "h2_ctx.h" #include "h2_conn.h" +#include "h2_filter.h" #include "h2_request.h" +#include "h2_headers.h" #include "h2_session.h" #include "h2_util.h" #include "h2_h2.h" @@ -569,6 +572,10 @@ void h2_h2_register_hooks(void) */ ap_hook_post_read_request(h2_h2_post_read_req, NULL, NULL, APR_HOOK_REALLY_FIRST); ap_hook_fixups(h2_h2_late_fixups, NULL, NULL, APR_HOOK_LAST); + + /* special bucket type transfer through a h2_bucket_beam */ + ap_hook_beam_bucket(h2_bucket_observer_beam, NULL, NULL, APR_HOOK_MIDDLE); + ap_hook_beam_bucket(h2_bucket_headers_beam, NULL, NULL, APR_HOOK_MIDDLE); } int h2_h2_process_conn(conn_rec* c) @@ -684,30 +691,23 @@ static int h2_h2_post_read_req(request_rec *r) * that we manipulate filters only once. */ if (task && !task->filters_set) { ap_filter_t *f; + ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r, "adding request filters"); - /* setup the correct output filters to process the response - * on the proper mod_http2 way. */ - ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r, "adding task output filter"); - if (task->ser_headers) { - ap_add_output_filter("H1_TO_H2_RESP", task, r, r->connection); - } - else { - /* replace the core http filter that formats response headers - * in HTTP/1 with our own that collects status and headers */ - ap_remove_output_filter_byhandle(r->output_filters, "HTTP_HEADER"); - ap_add_output_filter("H2_RESPONSE", task, r, r->connection); - } + /* setup the correct filters to process the request for h2 */ + ap_add_input_filter("H2_REQUEST", task, r, r->connection); + + /* replace the core http filter that formats response headers + * in HTTP/1 with our own that collects status and headers */ + ap_remove_output_filter_byhandle(r->output_filters, "HTTP_HEADER"); + ap_add_output_filter("H2_RESPONSE", task, r, r->connection); - /* trailers processing. Incoming trailers are added to this - * request via our h2 input filter, outgoing trailers - * in a special h2 out filter. */ for (f = r->input_filters; f; f = f->next) { - if (!strcmp("H2_TO_H1", f->frec->name)) { + if (!strcmp("H2_SLAVE_IN", f->frec->name)) { f->r = r; break; } } - ap_add_output_filter("H2_TRAILERS", task, r, r->connection); + ap_add_output_filter("H2_TRAILERS_OUT", task, r, r->connection); task->filters_set = 1; } } diff --git a/modules/http2/h2_headers.c b/modules/http2/h2_headers.c new file mode 100644 index 0000000000..8add79f507 --- /dev/null +++ b/modules/http2/h2_headers.c @@ -0,0 +1,161 @@ +/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <assert.h> +#include <stdio.h> + +#include <apr_strings.h> + +#include <httpd.h> +#include <http_core.h> +#include <http_log.h> +#include <util_time.h> + +#include <nghttp2/nghttp2.h> + +#include "h2_private.h" +#include "h2_h2.h" +#include "h2_util.h" +#include "h2_request.h" +#include "h2_headers.h" + + +typedef struct { + apr_bucket_refcount refcount; + h2_headers *headers; +} h2_bucket_headers; + +static apr_status_t bucket_read(apr_bucket *b, const char **str, + apr_size_t *len, apr_read_type_e block) +{ + (void)b; + (void)block; + *str = NULL; + *len = 0; + return APR_SUCCESS; +} + +apr_bucket * h2_bucket_headers_make(apr_bucket *b, h2_headers *r) +{ + h2_bucket_headers *br; + + br = apr_bucket_alloc(sizeof(*br), b->list); + br->headers = r; + + b = apr_bucket_shared_make(b, br, 0, 0); + b->type = &h2_bucket_type_headers; + + return b; +} + +apr_bucket * h2_bucket_headers_create(apr_bucket_alloc_t *list, + h2_headers *r) +{ + apr_bucket *b = apr_bucket_alloc(sizeof(*b), list); + + APR_BUCKET_INIT(b); + b->free = apr_bucket_free; + b->list = list; + b = h2_bucket_headers_make(b, r); + return b; +} + +h2_headers *h2_bucket_headers_get(apr_bucket *b) +{ + if (H2_BUCKET_IS_HEADERS(b)) { + return ((h2_bucket_headers *)b->data)->headers; + } + return NULL; +} + +const apr_bucket_type_t h2_bucket_type_headers = { + "H2HEADERS", 5, APR_BUCKET_METADATA, + apr_bucket_destroy_noop, + bucket_read, + apr_bucket_setaside_noop, + apr_bucket_split_notimpl, + apr_bucket_shared_copy +}; + +apr_bucket *h2_bucket_headers_beam(struct h2_bucket_beam *beam, + apr_bucket_brigade *dest, + const apr_bucket *src) +{ + if (H2_BUCKET_IS_HEADERS(src)) { + h2_headers *r = ((h2_bucket_headers *)src->data)->headers; + apr_bucket *b = h2_bucket_headers_create(dest->bucket_alloc, r); + APR_BRIGADE_INSERT_TAIL(dest, b); + return b; + } + return NULL; +} + + +h2_headers *h2_headers_create(int status, apr_table_t *headers_in, + apr_table_t *notes, apr_pool_t *pool) +{ + h2_headers *headers = apr_pcalloc(pool, sizeof(h2_headers)); + headers->status = status; + headers->headers = (headers_in? apr_table_copy(pool, headers_in) + : apr_table_make(pool, 5)); + headers->notes = (notes? apr_table_copy(pool, notes) + : apr_table_make(pool, 5)); + return headers; +} + +h2_headers *h2_headers_rcreate(request_rec *r, int status, + apr_table_t *header, apr_pool_t *pool) +{ + h2_headers *headers = h2_headers_create(status, header, r->notes, pool); + if (headers->status == HTTP_FORBIDDEN) { + const char *cause = apr_table_get(r->notes, "ssl-renegotiate-forbidden"); + if (cause) { + /* This request triggered a TLS renegotiation that is now allowed + * in HTTP/2. Tell the client that it should use HTTP/1.1 for this. + */ + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, headers->status, r, + APLOGNO(03061) + "h2_headers(%ld): renegotiate forbidden, cause: %s", + (long)r->connection->id, cause); + headers->status = H2_ERR_HTTP_1_1_REQUIRED; + } + } + return headers; +} + +h2_headers *h2_headers_die(apr_status_t type, + const h2_request *req, apr_pool_t *pool) +{ + h2_headers *headers; + char *date; + + headers = apr_pcalloc(pool, sizeof(h2_headers)); + headers->status = (type >= 200 && type < 600)? type : 500; + headers->headers = apr_table_make(pool, 5); + headers->notes = apr_table_make(pool, 5); + + date = apr_palloc(pool, APR_RFC822_DATE_LEN); + ap_recent_rfc822_date(date, req? req->request_time : apr_time_now()); + apr_table_setn(headers->headers, "Date", date); + apr_table_setn(headers->headers, "Server", ap_get_server_banner()); + + return headers; +} + +int h2_headers_are_response(h2_headers *headers) +{ + return headers->status >= 200; +} + diff --git a/modules/http2/h2_headers.h b/modules/http2/h2_headers.h new file mode 100644 index 0000000000..2078cfb705 --- /dev/null +++ b/modules/http2/h2_headers.h @@ -0,0 +1,70 @@ +/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __mod_h2__h2_headers__ +#define __mod_h2__h2_headers__ + +#include "h2.h" + +struct h2_bucket_beam; + +extern const apr_bucket_type_t h2_bucket_type_headers; + +#define H2_BUCKET_IS_HEADERS(e) (e->type == &h2_bucket_type_headers) + +apr_bucket * h2_bucket_headers_make(apr_bucket *b, h2_headers *r); + +apr_bucket * h2_bucket_headers_create(apr_bucket_alloc_t *list, + h2_headers *r); + +h2_headers *h2_bucket_headers_get(apr_bucket *b); + +apr_bucket *h2_bucket_headers_beam(struct h2_bucket_beam *beam, + apr_bucket_brigade *dest, + const apr_bucket *src); + +/** + * Create the headers from the given status and headers + * @param status the headers status + * @param header the headers of the headers + * @param notes the notes carried by the headers + * @param pool the memory pool to use + */ +h2_headers *h2_headers_create(int status, apr_table_t *header, + apr_table_t *notes, apr_pool_t *pool); + +/** + * Create the headers from the given request_rec. + * @param r the request record which was processed + * @param status the headers status + * @param header the headers of the headers + * @param pool the memory pool to use + */ +h2_headers *h2_headers_rcreate(request_rec *r, int status, + apr_table_t *header, apr_pool_t *pool); + +/** + * Create the headers for the given error. + * @param stream_id id of the stream to create the headers for + * @param type the error code + * @param req the original h2_request + * @param pool the memory pool to use + */ +h2_headers *h2_headers_die(apr_status_t type, + const struct h2_request *req, apr_pool_t *pool); + +int h2_headers_are_response(h2_headers *headers); + +#endif /* defined(__mod_h2__h2_headers__) */ diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index d5de4fe17f..461c88d64c 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -28,13 +28,13 @@ #include "mod_http2.h" +#include "h2.h" #include "h2_private.h" #include "h2_bucket_beam.h" #include "h2_config.h" #include "h2_conn.h" #include "h2_ctx.h" #include "h2_h2.h" -#include "h2_response.h" #include "h2_mplx.h" #include "h2_ngn_shed.h" #include "h2_request.h" @@ -297,7 +297,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->q = h2_iq_create(m->pool, m->max_streams); m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id)); - m->sresume = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id)); m->stream_timeout = stream_timeout; @@ -444,7 +443,6 @@ static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) */ h2_iq_remove(m->q, stream->id); h2_ihash_remove(m->sready, stream->id); - h2_ihash_remove(m->sresume, stream->id); h2_ihash_remove(m->streams, stream->id); if (stream->input) { m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input); @@ -516,12 +514,10 @@ static int task_print(void *ctx, void *val) h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ - "->03198: h2_stream(%s): %s %s %s -> %s %d" + "->03198: h2_stream(%s): %s %s %s" "[orph=%d/started=%d/done=%d/frozen=%d]", task->id, task->request->method, task->request->authority, task->request->path, - task->response? "http" : (task->rst_error? "reset" : "?"), - task->response? task->response->http_status : task->rst_error, (stream? 0 : 1), task->worker_started, task->worker_done, task->frozen); } @@ -545,7 +541,7 @@ static int task_abort_connection(void *ctx, void *val) if (task->input.beam) { h2_beam_abort(task->input.beam); } - if (task->worker_started && !task->worker_done && task->output.beam) { + if (task->output.beam) { h2_beam_abort(task->output.beam); } return 1; @@ -556,9 +552,9 @@ static int report_stream_iter(void *ctx, void *val) { h2_stream *stream = val; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, " - "submitted=%d, suspended=%d", + "ready=%d", m->id, stream->id, stream->started, stream->scheduled, - stream->submitted, stream->suspended); + h2_stream_is_ready(stream)); return 1; } @@ -575,9 +571,8 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): release_join with %d streams open, " - "%d streams resume, %d streams ready, %d tasks", + "%d streams ready, %d tasks", m->id, (int)h2_ihash_count(m->streams), - (int)h2_ihash_count(m->sresume), (int)h2_ihash_count(m->sready), (int)h2_ihash_count(m->tasks)); h2_ihash_iter(m->streams, report_stream_iter, m); @@ -707,6 +702,19 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream) return status; } +h2_stream *h2_mplx_stream_get(h2_mplx *m, apr_uint32_t id) +{ + h2_stream *s = NULL; + int acquired; + + AP_DEBUG_ASSERT(m); + if ((enter_mutex(m, &acquired)) == APR_SUCCESS) { + s = h2_ihash_get(m->streams, id); + leave_mutex(m, acquired); + } + return s; +} + void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) { m->input_consumed = cb; @@ -730,31 +738,26 @@ static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) } } -static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response) +static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) { apr_status_t status = APR_SUCCESS; h2_task *task = h2_ihash_get(m->tasks, stream_id); h2_stream *stream = h2_ihash_get(m->streams, stream_id); + apr_size_t beamed_count; if (!task || !stream) { return APR_ECONNABORTED; } - status = h2_task_add_response(task, response); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, - "h2_mplx(%s): add response: %d, rst=%d", - task->id, response->http_status, response->rst_error); - if (status != APR_SUCCESS) { - return status; - } - - if (task->output.beam && !task->output.opened) { - apr_uint32_t beamed_count; - h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem); - h2_beam_timeout_set(task->output.beam, m->stream_timeout); - h2_beam_on_consumed(task->output.beam, stream_output_consumed, task); - h2_beam_on_produced(task->output.beam, output_produced, m); - beamed_count = h2_beam_get_files_beamed(task->output.beam); + "h2_mplx(%s): out open", task->id); + + if (!stream->output) { + h2_beam_buffer_size_set(beam, m->stream_max_mem); + h2_beam_timeout_set(beam, m->stream_timeout); + h2_beam_on_consumed(beam, stream_output_consumed, task); + h2_beam_on_produced(beam, output_produced, m); + beamed_count = h2_beam_get_files_beamed(beam); if (m->tx_handles_reserved >= beamed_count) { m->tx_handles_reserved -= beamed_count; } @@ -762,22 +765,20 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response) m->tx_handles_reserved = 0; } if (!task->output.copy_files) { - h2_beam_on_file_beam(task->output.beam, can_beam_file, m); + h2_beam_on_file_beam(beam, can_beam_file, m); } - h2_beam_mutex_set(task->output.beam, beam_enter, task->cond, m); - task->output.opened = 1; + h2_beam_mutex_set(beam, beam_enter, task->cond, m); + stream->output = beam; } - if (response && response->http_status < 300) { - /* we might see some file buckets in the output, see - * if we have enough handles reserved. */ - check_tx_reservation(m); - } - have_out_data_for(m, stream, 1); + /* we might see some file buckets in the output, see + * if we have enough handles reserved. */ + check_tx_reservation(m); + have_out_data_for(m, stream, 0); return status; } -apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response) +apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) { apr_status_t status; int acquired; @@ -788,7 +789,7 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response) status = APR_ECONNABORTED; } else { - status = out_open(m, stream_id, response); + status = out_open(m, stream_id, beam); } leave_mutex(m, acquired); } @@ -809,16 +810,6 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task) return APR_ECONNABORTED; } - if (!task->response && !task->rst_error) { - /* In case a close comes before a response was created, - * insert an error one so that our streams can properly reset. - */ - h2_response *r = h2_response_die(task->stream_id, 500, - task->request, m->pool); - status = out_open(m, task->stream_id, r); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, APLOGNO(03393) - "h2_mplx(%s): close, no response, no rst", task->id); - } ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, "h2_mplx(%s): close", task->id); if (task->output.beam) { @@ -842,7 +833,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, if (m->aborted) { status = APR_ECONNABORTED; } - else if (!h2_ihash_empty(m->sready) || !h2_ihash_empty(m->sresume)) { + else if (!h2_ihash_empty(m->sready)) { status = APR_SUCCESS; } else { @@ -863,13 +854,10 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response) { - h2_ihash_t *set; ap_assert(m); ap_assert(stream); - - set = response? m->sready : m->sresume; - if (!h2_ihash_get(set, stream->id)) { - h2_ihash_add(set, stream); + if (!h2_ihash_get(m->sready, stream->id)) { + h2_ihash_add(m->sready, stream); if (m->added_output) { apr_thread_cond_signal(m->added_output); } @@ -910,25 +898,20 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, } else { h2_ihash_add(m->streams, stream); - if (stream->response) { - /* already have a respone, schedule for submit */ + if (h2_stream_is_ready(stream)) { h2_ihash_add(m->sready, stream); } else { - h2_beam_create(&stream->input, stream->pool, stream->id, - "input", 0); if (!m->need_registration) { m->need_registration = h2_iq_empty(m->q); } if (m->workers_busy < m->workers_max) { do_registration = m->need_registration; } - h2_iq_add(m->q, stream->id, cmp, ctx); - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, - "h2_mplx(%ld-%d): process, body=%d", - m->c->id, stream->id, stream->request->body); + h2_iq_add(m->q, stream->id, cmp, ctx); } + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, + "h2_mplx(%ld-%d): process", m->c->id, stream->id); } leave_mutex(m, acquired); } @@ -939,7 +922,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, return status; } -static h2_task *pop_task(h2_mplx *m) +static h2_task *next_stream_task(h2_mplx *m) { h2_task *task = NULL; h2_stream *stream; @@ -957,13 +940,13 @@ static h2_task *pop_task(h2_mplx *m) slave = *pslave; } else { - slave = h2_slave_create(m->c, m->pool, NULL); + slave = h2_slave_create(m->c, stream->id, m->pool, NULL); new_conn = 1; } slave->sbh = m->c->sbh; slave->aborted = 0; - task = h2_task_create(slave, stream->request, stream->input, m); + task = h2_task_create(slave, stream->id, stream->request, stream->input, m); h2_ihash_add(m->tasks, task); m->c->keepalives++; @@ -1003,7 +986,7 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) *has_more = 0; } else { - task = pop_task(m); + task = next_stream_task(m); *has_more = !h2_iq_empty(m->q); } @@ -1022,9 +1005,6 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) * and the original worker has finished. That means the * engine may start processing now. */ h2_task_thaw(task); - /* we do not want the task to block on writing response - * bodies into the mplx. */ - h2_task_set_io_blocking(task, 0); apr_thread_cond_broadcast(m->task_thawed); return; } @@ -1141,7 +1121,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) --m->workers_busy; if (ptask) { /* caller wants another task */ - *ptask = pop_task(m); + *ptask = next_stream_task(m); } leave_mutex(m, acquired); } @@ -1154,15 +1134,19 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) static int latest_repeatable_unsubmitted_iter(void *data, void *val) { task_iter_ctx *ctx = data; + h2_stream *stream; h2_task *task = val; if (!task->worker_done && h2_task_can_redo(task) && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) { - /* this task occupies a worker, the response has not been submitted yet, - * not been cancelled and it is a repeatable request - * -> it can be re-scheduled later */ - if (!ctx->task || ctx->task->started_at < task->started_at) { - /* we did not have one or this one was started later */ - ctx->task = task; + stream = h2_ihash_get(ctx->m->streams, task->stream_id); + if (stream && !h2_stream_is_ready(stream)) { + /* this task occupies a worker, the response has not been submitted + * yet, not been cancelled and it is a repeatable request + * -> it can be re-scheduled later */ + if (!ctx->task || ctx->task->started_at < task->started_at) { + /* we did not have one or this one was started later */ + ctx->task = task; + } } } return 1; @@ -1329,13 +1313,12 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, return APR_ECONNABORTED; } m = task->mplx; - task->r = r; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); if (stream) { - status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit); + status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit); } else { status = APR_ECONNABORTED; @@ -1353,7 +1336,6 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn, h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn); h2_mplx *m = h2_ngn_shed_get_ctx(shed); apr_status_t status; - h2_task *task = NULL; int acquired; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { @@ -1368,22 +1350,21 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn, * had and, if not, wait a short while before doing the * blocking, and if unsuccessful, terminating read. */ - status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task); + status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr); if (APR_STATUS_IS_EAGAIN(status)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): start block engine pull", m->id); apr_thread_cond_timedwait(m->task_thawed, m->lock, apr_time_from_msec(20)); - status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task); + status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr); } } else { - status = h2_ngn_shed_pull_task(shed, ngn, capacity, - want_shutdown, &task); + status = h2_ngn_shed_pull_request(shed, ngn, capacity, + want_shutdown, pr); } leave_mutex(m, acquired); } - *pr = task? task->r : NULL; return status; } @@ -1423,14 +1404,12 @@ static int update_window(void *ctx, void *val) apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, stream_ev_callback *on_resume, - stream_ev_callback *on_response, void *on_ctx) { apr_status_t status; int acquired; int streams[32]; h2_stream *stream; - h2_task *task; size_t i, n; AP_DEBUG_ASSERT(m); @@ -1440,8 +1419,7 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, /* update input windows for streams */ h2_ihash_iter(m->streams, update_window, m); - - if (on_response && !h2_ihash_empty(m->sready)) { + if (on_resume && !h2_ihash_empty(m->sready)) { n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams)); for (i = 0; i < n; ++i) { stream = h2_ihash_get(m->streams, streams[i]); @@ -1449,49 +1427,9 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, continue; } ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, - "h2_mplx(%ld-%d): on_response", - m->id, stream->id); - task = h2_ihash_get(m->tasks, stream->id); - if (task) { - task->response_sent = 1; - if (task->rst_error) { - h2_stream_rst(stream, task->rst_error); - } - else { - AP_DEBUG_ASSERT(task->response); - status = h2_stream_add_response(stream, task->response, - task->output.beam); - if (status != APR_SUCCESS) { - h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR); - } - if (!h2_response_get_final(task->response)) { - /* the final response needs still to arrive */ - task->response = NULL; - } - } - } - else { - /* We have the stream ready without a task. This happens - * when we fail streams early. A response should already - * be present. */ - AP_DEBUG_ASSERT(stream->response || stream->rst_error); - } - status = on_response(on_ctx, stream->id); - } - } - - if (on_resume && !h2_ihash_empty(m->sresume)) { - n = h2_ihash_ishift(m->sresume, streams, H2_ALEN(streams)); - for (i = 0; i < n; ++i) { - stream = h2_ihash_get(m->streams, streams[i]); - if (!stream) { - continue; - } - ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, "h2_mplx(%ld-%d): on_resume", m->id, stream->id); - h2_stream_set_suspended(stream, 0); - status = on_resume(on_ctx, stream->id); + on_resume(on_ctx, stream->id); } } @@ -1500,25 +1438,36 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, return status; } -apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id) +apr_status_t h2_mplx_keep_active(h2_mplx *m, apr_uint32_t stream_id) { apr_status_t status; - h2_stream *stream; - h2_task *task; int acquired; AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - stream = h2_ihash_get(m->streams, stream_id); - if (stream && !h2_ihash_get(m->sresume, stream->id)) { - /* not marked for resume again already */ - h2_stream_set_suspended(stream, 1); - task = h2_ihash_get(m->tasks, stream->id); - if (stream->started && (!task || task->worker_done)) { - h2_ihash_add(m->sresume, stream); - } + h2_stream *s = h2_ihash_get(m->streams, stream_id); + if (s) { + h2_ihash_add(m->sready, s); } leave_mutex(m, acquired); } return status; } + +int h2_mplx_awaits_data(h2_mplx *m) +{ + apr_status_t status; + int acquired, waiting = 1; + + AP_DEBUG_ASSERT(m); + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { + if (h2_ihash_empty(m->streams)) { + waiting = 0; + } + if (h2_iq_empty(m->q) && h2_ihash_empty(m->tasks)) { + waiting = 0; + } + leave_mutex(m, acquired); + } + return waiting; +} diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 229518cb21..308facd895 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -40,7 +40,6 @@ struct apr_thread_cond_t; struct h2_bucket_beam; struct h2_config; struct h2_ihash_t; -struct h2_response; struct h2_task; struct h2_stream; struct h2_request; @@ -76,9 +75,8 @@ struct h2_mplx { struct h2_ihash_t *spurge; /* all streams done, ready for destroy */ struct h2_iqueue *q; /* all stream ids that need to be started */ - struct h2_ihash_t *sready; /* all streams ready for response */ - struct h2_ihash_t *sresume; /* all streams that can be resumed */ - + struct h2_ihash_t *sready; /* all streams ready for output */ + struct h2_ihash_t *tasks; /* all tasks started and not destroyed */ struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */ @@ -164,6 +162,8 @@ int h2_mplx_is_busy(h2_mplx *m); * IO lifetime of streams. ******************************************************************************/ +struct h2_stream *h2_mplx_stream_get(h2_mplx *m, apr_uint32_t id); + /** * Notifies mplx that a stream has finished processing. * @@ -181,6 +181,8 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, struct h2_stream *stream); apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, struct apr_thread_cond_t *iowait); +apr_status_t h2_mplx_keep_active(h2_mplx *m, apr_uint32_t stream_id); + /******************************************************************************* * Stream processing. ******************************************************************************/ @@ -222,16 +224,15 @@ typedef apr_status_t stream_ev_callback(void *ctx, int stream_id); /** * Dispatch events for the master connection, such as - * - resume: new output data has arrived for a suspended stream - * - response: the response for a stream is ready + ± @param m the multiplexer + * @param on_resume new output data has arrived for a suspended stream + * @param ctx user supplied argument to invocation. */ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, stream_ev_callback *on_resume, - stream_ev_callback *on_response, void *ctx); -apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id); - +int h2_mplx_awaits_data(h2_mplx *m); typedef int h2_mplx_stream_cb(struct h2_stream *s, void *ctx); @@ -245,7 +246,7 @@ apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx); * Opens the output for the given stream with the specified response. */ apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id, - struct h2_response *response); + struct h2_bucket_beam *beam); /******************************************************************************* * h2_mplx list Manipulation. diff --git a/modules/http2/h2_ngn_shed.c b/modules/http2/h2_ngn_shed.c index 14e57a7aa3..6be6d24d71 100644 --- a/modules/http2/h2_ngn_shed.c +++ b/modules/http2/h2_ngn_shed.c @@ -35,7 +35,6 @@ #include "h2_ctx.h" #include "h2_h2.h" #include "h2_mplx.h" -#include "h2_response.h" #include "h2_request.h" #include "h2_task.h" #include "h2_util.h" @@ -46,6 +45,7 @@ typedef struct h2_ngn_entry h2_ngn_entry; struct h2_ngn_entry { APR_RING_ENTRY(h2_ngn_entry) link; h2_task *task; + request_rec *r; }; #define H2_NGN_ENTRY_NEXT(e) APR_RING_NEXT((e), link) @@ -144,26 +144,28 @@ void h2_ngn_shed_abort(h2_ngn_shed *shed) shed->aborted = 1; } -static void ngn_add_task(h2_req_engine *ngn, h2_task *task) +static void ngn_add_task(h2_req_engine *ngn, h2_task *task, request_rec *r) { h2_ngn_entry *entry = apr_pcalloc(task->pool, sizeof(*entry)); APR_RING_ELEM_INIT(entry, link); entry->task = task; + entry->r = r; H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry); } -apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type, - h2_task *task, http2_req_engine_init *einit) +apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type, + request_rec *r, + http2_req_engine_init *einit) { h2_req_engine *ngn; + h2_task *task = h2_ctx_rget_task(r); - AP_DEBUG_ASSERT(shed); - + ap_assert(task); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, "h2_ngn_shed(%ld): PUSHing request (task=%s)", shed->c->id, task->id); - if (task->ser_headers) { + if (task->request->serialize) { /* Max compatibility, deny processing of this */ return APR_EOF; } @@ -184,7 +186,7 @@ apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type, if (!h2_task_is_detached(task)) { h2_task_freeze(task); } - ngn_add_task(ngn, task); + ngn_add_task(ngn, task, r); ngn->no_assigned++; return APR_SUCCESS; } @@ -207,7 +209,7 @@ apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type, APR_RING_INIT(&newngn->entries, h2_ngn_entry, link); status = einit(newngn, newngn->id, newngn->type, newngn->pool, - shed->req_buffer_size, task->r, + shed->req_buffer_size, r, &newngn->out_consumed, &newngn->out_consumed_ctx); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03395) "h2_ngn_shed(%ld): create engine %s (%s)", @@ -242,16 +244,16 @@ static h2_ngn_entry *pop_detached(h2_req_engine *ngn) return NULL; } -apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, - h2_req_engine *ngn, - apr_uint32_t capacity, - int want_shutdown, - h2_task **ptask) +apr_status_t h2_ngn_shed_pull_request(h2_ngn_shed *shed, + h2_req_engine *ngn, + apr_uint32_t capacity, + int want_shutdown, + request_rec **pr) { h2_ngn_entry *entry; AP_DEBUG_ASSERT(ngn); - *ptask = NULL; + *pr = NULL; ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, APLOGNO(03396) "h2_ngn_shed(%ld): pull task for engine %s, shutdown=%d", shed->c->id, ngn->id, want_shutdown); @@ -279,7 +281,7 @@ apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, "h2_ngn_shed(%ld): pulled request %s for engine %s", shed->c->id, entry->task->id, ngn->id); ngn->no_live++; - *ptask = entry->task; + *pr = entry->r; entry->task->assigned = ngn; /* task will now run in ngn's own thread. Modules like lua * seem to require the correct thread set in the conn_rec. diff --git a/modules/http2/h2_ngn_shed.h b/modules/http2/h2_ngn_shed.h index 832dbd3a8e..1f61466a5c 100644 --- a/modules/http2/h2_ngn_shed.h +++ b/modules/http2/h2_ngn_shed.h @@ -58,13 +58,13 @@ h2_ngn_shed *h2_ngn_shed_get_shed(struct h2_req_engine *ngn); void h2_ngn_shed_abort(h2_ngn_shed *shed); -apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type, - struct h2_task *task, - h2_shed_ngn_init *init_cb); +apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type, + request_rec *r, + h2_shed_ngn_init *init_cb); -apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, h2_req_engine *pub_ngn, - apr_uint32_t capacity, - int want_shutdown, struct h2_task **ptask); +apr_status_t h2_ngn_shed_pull_request(h2_ngn_shed *shed, h2_req_engine *pub_ngn, + apr_uint32_t capacity, + int want_shutdown, request_rec **pr); apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, struct h2_req_engine *ngn, diff --git a/modules/http2/h2_proxy_util.c b/modules/http2/h2_proxy_util.c index 040671172c..39e139d5c8 100644 --- a/modules/http2/h2_proxy_util.c +++ b/modules/http2/h2_proxy_util.c @@ -566,7 +566,6 @@ static h2_request *h2_req_createn(int id, apr_pool_t *pool, const char *method, { h2_request *req = apr_pcalloc(pool, sizeof(h2_request)); - req->id = id; req->method = method; req->scheme = scheme; req->authority = authority; diff --git a/modules/http2/h2_push.c b/modules/http2/h2_push.c index df5632af1e..042042704e 100644 --- a/modules/http2/h2_push.c +++ b/modules/http2/h2_push.c @@ -34,7 +34,7 @@ #include "h2_util.h" #include "h2_push.h" #include "h2_request.h" -#include "h2_response.h" +#include "h2_headers.h" #include "h2_session.h" #include "h2_stream.h" @@ -58,6 +58,7 @@ static const char *policy_str(h2_push_policy policy) typedef struct { const h2_request *req; + int push_policy; apr_pool_t *pool; apr_array_header_t *pushes; const char *s; @@ -336,7 +337,7 @@ static int add_push(link_ctx *ctx) */ path = apr_uri_unparse(ctx->pool, &uri, APR_URI_UNP_OMITSITEPART); push = apr_pcalloc(ctx->pool, sizeof(*push)); - switch (ctx->req->push_policy) { + switch (ctx->push_policy) { case H2_PUSH_HEAD: method = "HEAD"; break; @@ -350,7 +351,7 @@ static int add_push(link_ctx *ctx) ctx->req->authority, path, headers, ctx->req->serialize); /* atm, we do not push on pushes */ - h2_request_end_headers(req, ctx->pool, 1, 0); + h2_request_end_headers(req, ctx->pool, 1); push->req = req; if (!ctx->pushes) { @@ -427,10 +428,10 @@ static int head_iter(void *ctx, const char *key, const char *value) return 1; } -apr_array_header_t *h2_push_collect(apr_pool_t *p, const h2_request *req, - const h2_response *res) +apr_array_header_t *h2_push_collect(apr_pool_t *p, const h2_request *req, + int push_policy, const h2_headers *res) { - if (req && req->push_policy != H2_PUSH_NONE) { + if (req && push_policy != H2_PUSH_NONE) { /* Collect push candidates from the request/response pair. * * One source for pushes are "rel=preload" link headers @@ -444,11 +445,13 @@ apr_array_header_t *h2_push_collect(apr_pool_t *p, const h2_request *req, memset(&ctx, 0, sizeof(ctx)); ctx.req = req; + ctx.push_policy = push_policy; ctx.pool = p; apr_table_do(head_iter, &ctx, res->headers, NULL); if (ctx.pushes) { - apr_table_setn(res->headers, "push-policy", policy_str(req->push_policy)); + apr_table_setn(res->headers, "push-policy", + policy_str(push_policy)); } return ctx.pushes; } @@ -681,7 +684,7 @@ apr_array_header_t *h2_push_diary_update(h2_session *session, apr_array_header_t apr_array_header_t *h2_push_collect_update(h2_stream *stream, const struct h2_request *req, - const struct h2_response *res) + const struct h2_headers *res) { h2_session *session = stream->session; const char *cache_digest = apr_table_get(req->headers, "Cache-Digest"); @@ -698,7 +701,7 @@ apr_array_header_t *h2_push_collect_update(h2_stream *stream, session->id, cache_digest); } } - pushes = h2_push_collect(stream->pool, req, res); + pushes = h2_push_collect(stream->pool, req, stream->push_policy, res); return h2_push_diary_update(stream->session, pushes); } diff --git a/modules/http2/h2_push.h b/modules/http2/h2_push.h index ae1ff6281a..bfe204f5a4 100644 --- a/modules/http2/h2_push.h +++ b/modules/http2/h2_push.h @@ -18,7 +18,7 @@ #include "h2.h" struct h2_request; -struct h2_response; +struct h2_headers; struct h2_ngheader; struct h2_session; struct h2_stream; @@ -58,7 +58,8 @@ struct h2_push_diary { */ apr_array_header_t *h2_push_collect(apr_pool_t *p, const struct h2_request *req, - const struct h2_response *res); + int push_policy, + const struct h2_headers *res); /** * Create a new push diary for the given maximum number of entries. @@ -81,7 +82,7 @@ apr_array_header_t *h2_push_diary_update(struct h2_session *session, apr_array_h */ apr_array_header_t *h2_push_collect_update(struct h2_stream *stream, const struct h2_request *req, - const struct h2_response *res); + const struct h2_headers *res); /** * Get a cache digest as described in * https://datatracker.ietf.org/doc/draft-kazuho-h2-cache-digest/ diff --git a/modules/http2/h2_request.c b/modules/http2/h2_request.c index 63dceb9d55..d2d7df472c 100644 --- a/modules/http2/h2_request.c +++ b/modules/http2/h2_request.c @@ -36,13 +36,6 @@ #include "h2_util.h" -static apr_status_t inspect_clen(h2_request *req, const char *s) -{ - char *end; - req->content_length = apr_strtoi64(s, &end, 10); - return (s == end)? APR_EINVAL : APR_SUCCESS; -} - typedef struct { apr_table_t *headers; apr_pool_t *pool; @@ -59,7 +52,6 @@ static int set_h1_header(void *ctx, const char *key, const char *value) } apr_status_t h2_request_rcreate(h2_request **preq, apr_pool_t *pool, - int stream_id, int initiated_on, request_rec *r) { h2_request *req; @@ -86,8 +78,6 @@ apr_status_t h2_request_rcreate(h2_request **preq, apr_pool_t *pool, } req = apr_pcalloc(pool, sizeof(*req)); - req->id = stream_id; - req->initiated_on = initiated_on; req->method = apr_pstrdup(pool, r->method); req->scheme = scheme; req->authority = authority; @@ -121,8 +111,7 @@ apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool, if (!apr_is_empty_table(req->headers)) { ap_log_perror(APLOG_MARK, APLOG_ERR, 0, pool, APLOGNO(02917) - "h2_request(%d): pseudo header after request start", - req->id); + "h2_request: pseudo header after request start"); return APR_EGENERAL; } @@ -148,8 +137,8 @@ apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool, strncpy(buffer, name, (nlen > 31)? 31 : nlen); ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, pool, APLOGNO(02954) - "h2_request(%d): ignoring unknown pseudo header %s", - req->id, buffer); + "h2_request: ignoring unknown pseudo header %s", + buffer); } } else { @@ -160,8 +149,7 @@ apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool, return status; } -apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, - int eos, int push) +apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, int eos) { const char *s; @@ -181,21 +169,9 @@ apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, } s = apr_table_get(req->headers, "Content-Length"); - if (s) { - if (inspect_clen(req, s) != APR_SUCCESS) { - ap_log_perror(APLOG_MARK, APLOG_WARNING, APR_EINVAL, pool, - APLOGNO(02959) - "h2_request(%d): content-length value not parsed: %s", - req->id, s); - return APR_EINVAL; - } - req->body = 1; - } - else { + if (!s) { /* no content-length given */ - req->content_length = -1; - req->body = !eos; - if (req->body) { + if (!eos) { /* We have not seen a content-length and have no eos, * simulate a chunked encoding for our HTTP/1.1 infrastructure, * in case we have "H2SerializeHeaders on" here @@ -204,67 +180,16 @@ apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, apr_table_mergen(req->headers, "Transfer-Encoding", "chunked"); } else if (apr_table_get(req->headers, "Content-Type")) { - /* If we have a content-type, but already see eos, no more + /* If we have a content-type, but already seen eos, no more * data will come. Signal a zero content length explicitly. */ apr_table_setn(req->headers, "Content-Length", "0"); } } - h2_push_policy_determine(req, pool, push); - - /* In the presence of trailers, force behaviour of chunked encoding */ - s = apr_table_get(req->headers, "Trailer"); - if (s && s[0]) { - req->trailers = apr_table_make(pool, 5); - if (!req->chunked) { - req->chunked = 1; - apr_table_mergen(req->headers, "Transfer-Encoding", "chunked"); - } - } - - return APR_SUCCESS; -} - -static apr_status_t add_h1_trailer(h2_request *req, apr_pool_t *pool, - const char *name, size_t nlen, - const char *value, size_t vlen) -{ - char *hname, *hvalue; - - if (h2_req_ignore_trailer(name, nlen)) { - return APR_SUCCESS; - } - - hname = apr_pstrndup(pool, name, nlen); - hvalue = apr_pstrndup(pool, value, vlen); - h2_util_camel_case_header(hname, nlen); - - apr_table_mergen(req->trailers, hname, hvalue); - return APR_SUCCESS; } - -apr_status_t h2_request_add_trailer(h2_request *req, apr_pool_t *pool, - const char *name, size_t nlen, - const char *value, size_t vlen) -{ - if (!req->trailers) { - ap_log_perror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, pool, APLOGNO(03059) - "h2_request(%d): unanounced trailers", - req->id); - return APR_EINVAL; - } - if (nlen == 0 || name[0] == ':') { - ap_log_perror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, pool, APLOGNO(03060) - "h2_request(%d): pseudo header in trailer", - req->id); - return APR_EINVAL; - } - return add_h1_trailer(req, pool, name, nlen, value, vlen); -} - h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src) { h2_request *dst = apr_pmemdup(p, src, sizeof(*dst)); @@ -273,9 +198,6 @@ h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src) dst->authority = apr_pstrdup(p, src->authority); dst->path = apr_pstrdup(p, src->path); dst->headers = apr_table_clone(p, src->headers); - if (src->trailers) { - dst->trailers = apr_table_clone(p, src->trailers); - } return dst; } @@ -346,8 +268,8 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *c) * request for a vhost where h2 is disabled --> 421. */ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03367) - "h2_request(%d): access_status=%d, request_create failed", - req->id, access_status); + "h2_request: access_status=%d, request_create failed", + access_status); ap_die(access_status, r); ap_update_child_status(c->sbh, SERVER_BUSY_LOG, r); ap_run_log_transaction(r); diff --git a/modules/http2/h2_request.h b/modules/http2/h2_request.h index 4a3f3ca285..faf9791194 100644 --- a/modules/http2/h2_request.h +++ b/modules/http2/h2_request.h @@ -19,7 +19,6 @@ #include "h2.h" apr_status_t h2_request_rcreate(h2_request **preq, apr_pool_t *pool, - int stream_id, int initiated_on, request_rec *r); apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool, @@ -30,8 +29,7 @@ apr_status_t h2_request_add_trailer(h2_request *req, apr_pool_t *pool, const char *name, size_t nlen, const char *value, size_t vlen); -apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, - int eos, int push); +apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, int eos); h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src); diff --git a/modules/http2/h2_response.c b/modules/http2/h2_response.c deleted file mode 100644 index 85599dbcf5..0000000000 --- a/modules/http2/h2_response.c +++ /dev/null @@ -1,219 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <assert.h> -#include <stdio.h> - -#include <apr_strings.h> - -#include <httpd.h> -#include <http_core.h> -#include <http_log.h> -#include <util_time.h> - -#include <nghttp2/nghttp2.h> - -#include "h2_private.h" -#include "h2_filter.h" -#include "h2_h2.h" -#include "h2_util.h" -#include "h2_request.h" -#include "h2_response.h" - - -static apr_table_t *parse_headers(apr_array_header_t *hlines, apr_pool_t *pool) -{ - if (hlines) { - apr_table_t *headers = apr_table_make(pool, hlines->nelts); - int i; - - for (i = 0; i < hlines->nelts; ++i) { - char *hline = ((char **)hlines->elts)[i]; - char *sep = ap_strchr(hline, ':'); - if (!sep) { - ap_log_perror(APLOG_MARK, APLOG_WARNING, APR_EINVAL, pool, - APLOGNO(02955) "h2_response: invalid header[%d] '%s'", - i, (char*)hline); - /* not valid format, abort */ - return NULL; - } - (*sep++) = '\0'; - while (*sep == ' ' || *sep == '\t') { - ++sep; - } - - if (!h2_util_ignore_header(hline)) { - apr_table_merge(headers, hline, sep); - } - } - return headers; - } - else { - return apr_table_make(pool, 0); - } -} - -static const char *get_sos_filter(apr_table_t *notes) -{ - return notes? apr_table_get(notes, H2_RESP_SOS_NOTE) : NULL; -} - -static void check_clen(h2_response *response, request_rec *r, apr_pool_t *pool) -{ - - if (r && r->header_only) { - response->content_length = 0; - } - else if (response->headers) { - const char *s = apr_table_get(response->headers, "Content-Length"); - if (s) { - char *end; - response->content_length = apr_strtoi64(s, &end, 10); - if (s == end) { - ap_log_perror(APLOG_MARK, APLOG_WARNING, APR_EINVAL, - pool, APLOGNO(02956) - "h2_response: content-length" - " value not parsed: %s", s); - response->content_length = -1; - } - } - } -} - -static h2_response *h2_response_create_int(int stream_id, - int rst_error, - int http_status, - apr_table_t *headers, - apr_table_t *notes, - apr_pool_t *pool) -{ - h2_response *response; - - if (!headers) { - return NULL; - } - - response = apr_pcalloc(pool, sizeof(h2_response)); - if (response == NULL) { - return NULL; - } - - response->stream_id = stream_id; - response->rst_error = rst_error; - response->http_status = http_status? http_status : 500; - response->content_length = -1; - response->headers = headers; - response->sos_filter = get_sos_filter(notes); - - check_clen(response, NULL, pool); - return response; -} - - -h2_response *h2_response_create(int stream_id, - int rst_error, - int http_status, - apr_array_header_t *hlines, - apr_table_t *notes, - apr_pool_t *pool) -{ - return h2_response_create_int(stream_id, rst_error, http_status, - parse_headers(hlines, pool), notes, pool); -} - -h2_response *h2_response_rcreate(int stream_id, request_rec *r, int status, - apr_table_t *header, apr_pool_t *pool) -{ - h2_response *response = apr_pcalloc(pool, sizeof(h2_response)); - if (response == NULL) { - return NULL; - } - - response->stream_id = stream_id; - response->http_status = status; - response->content_length = -1; - response->headers = header? header : apr_table_make(pool, 5); - response->sos_filter = get_sos_filter(r->notes); - - check_clen(response, r, pool); - - if (response->http_status == HTTP_FORBIDDEN) { - const char *cause = apr_table_get(r->notes, "ssl-renegotiate-forbidden"); - if (cause) { - /* This request triggered a TLS renegotiation that is now allowed - * in HTTP/2. Tell the client that it should use HTTP/1.1 for this. - */ - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, response->http_status, r, - APLOGNO(03061) - "h2_response(%ld-%d): renegotiate forbidden, cause: %s", - (long)r->connection->id, stream_id, cause); - response->rst_error = H2_ERR_HTTP_1_1_REQUIRED; - } - } - - return response; -} - -h2_response *h2_response_die(int stream_id, apr_status_t type, - const struct h2_request *req, apr_pool_t *pool) -{ - apr_table_t *headers = apr_table_make(pool, 5); - char *date = NULL; - int status = (type >= 200 && type < 600)? type : 500; - - date = apr_palloc(pool, APR_RFC822_DATE_LEN); - ap_recent_rfc822_date(date, req? req->request_time : apr_time_now()); - apr_table_setn(headers, "Date", date); - apr_table_setn(headers, "Server", ap_get_server_banner()); - - return h2_response_create_int(stream_id, 0, status, headers, NULL, pool); -} - -h2_response *h2_response_clone(apr_pool_t *pool, h2_response *from) -{ - h2_response *to = apr_pcalloc(pool, sizeof(h2_response)); - - to->stream_id = from->stream_id; - to->http_status = from->http_status; - to->content_length = from->content_length; - to->sos_filter = from->sos_filter; - if (from->headers) { - to->headers = apr_table_clone(pool, from->headers); - } - if (from->trailers) { - to->trailers = apr_table_clone(pool, from->trailers); - } - return to; -} - -void h2_response_set_trailers(h2_response *response, apr_table_t *trailers) -{ - response->trailers = trailers; -} - -int h2_response_is_final(h2_response *response) -{ - return response->http_status >= 200; -} - -h2_response *h2_response_get_final(h2_response *response) -{ - for (/**/; response; response = response->next) { - if (h2_response_is_final(response)) { - return response; - } - } - return NULL; -} diff --git a/modules/http2/h2_response.h b/modules/http2/h2_response.h deleted file mode 100644 index bc2fdb936e..0000000000 --- a/modules/http2/h2_response.h +++ /dev/null @@ -1,76 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __mod_h2__h2_response__ -#define __mod_h2__h2_response__ - -#include "h2.h" - -/** - * Create the response from the status and parsed header lines. - * @param stream_id id of the stream to create the response for - * @param rst_error error for reset or 0 - * @param http_status http status code of response - * @param hlines the text lines of the response header - * @param pool the memory pool to use - */ -h2_response *h2_response_create(int stream_id, - int rst_error, - int http_status, - apr_array_header_t *hlines, - apr_table_t *notes, - apr_pool_t *pool); - -/** - * Create the response from the given request_rec. - * @param stream_id id of the stream to create the response for - * @param r the request record which was processed - * @param header the headers of the response - * @param pool the memory pool to use - */ -h2_response *h2_response_rcreate(int stream_id, request_rec *r, int status, - apr_table_t *header, apr_pool_t *pool); - -/** - * Create the response for the given error. - * @param stream_id id of the stream to create the response for - * @param type the error code - * @param req the original h2_request - * @param pool the memory pool to use - */ -h2_response *h2_response_die(int stream_id, apr_status_t type, - const struct h2_request *req, apr_pool_t *pool); - -/** - * Deep copies the response into a new pool. - * @param pool the pool to use for the clone - * @param from the response to clone - * @return the cloned response - */ -h2_response *h2_response_clone(apr_pool_t *pool, h2_response *from); - -/** - * Set the trailers in the response. Will replace any existing trailers. Will - * *not* clone the table. - * - * @param response the repsone to set the trailers for - * @param trailers the trailers to set - */ -void h2_response_set_trailers(h2_response *response, apr_table_t *trailers); - -int h2_response_is_final(h2_response *response); -h2_response *h2_response_get_final(h2_response *response); - -#endif /* defined(__mod_h2__h2_response__) */ diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index cf29a0281d..a941ee0b80 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -38,9 +38,8 @@ #include "h2_mplx.h" #include "h2_push.h" #include "h2_request.h" -#include "h2_response.h" +#include "h2_headers.h" #include "h2_stream.h" -#include "h2_from_h1.h" #include "h2_task.h" #include "h2_session.h" #include "h2_util.h" @@ -407,7 +406,7 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame, status = h2_stream_add_header(stream, (const char *)name, namelen, (const char *)value, valuelen); - if (status != APR_SUCCESS && !stream->response) { + if (status != APR_SUCCESS && !h2_stream_is_ready(stream)) { return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; } return 0; @@ -1138,6 +1137,10 @@ static apr_status_t h2_session_start(h2_session *session, int *rv) return status; } +static apr_status_t on_stream_headers(h2_session *session, h2_stream *stream, + h2_headers *headers, apr_off_t len, + int eos); + static ssize_t stream_data_cb(nghttp2_session *ng2s, int32_t stream_id, uint8_t *buf, @@ -1171,8 +1174,8 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, session->id, (int)stream_id); return NGHTTP2_ERR_CALLBACK_FAILURE; } - - status = h2_stream_out_prepare(stream, &nread, &eos); + + status = h2_stream_out_prepare(stream, &nread, &eos, NULL); if (nread) { *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY; } @@ -1191,7 +1194,6 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, * it. Remember at our h2_stream that we need to do this. */ nread = 0; - h2_mplx_suspend_stream(session->mplx, stream->id); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03071) "h2_stream(%ld-%d): suspending", session->id, (int)stream_id); @@ -1206,25 +1208,8 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, } if (eos) { - apr_table_t *trailers = h2_stream_get_trailers(stream); - if (trailers && !apr_is_empty_table(trailers)) { - h2_ngheader *nh; - int rv; - - nh = h2_util_ngheader_make(stream->pool, trailers); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03072) - "h2_stream(%ld-%d): submit %d trailers", - session->id, (int)stream_id,(int) nh->nvlen); - rv = nghttp2_submit_trailer(ng2s, stream->id, nh->nv, nh->nvlen); - if (rv < 0) { - nread = rv; - } - *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM; - } - *data_flags |= NGHTTP2_DATA_FLAG_EOF; } - return (ssize_t)nread; } @@ -1423,83 +1408,56 @@ static apr_status_t h2_session_send(h2_session *session) } /** - * A stream was resumed as new output data arrived. + * headers for the stream are ready. */ -static apr_status_t on_stream_resume(void *ctx, int stream_id) +static apr_status_t on_stream_headers(h2_session *session, h2_stream *stream, + h2_headers *headers, apr_off_t len, + int eos) { - h2_session *session = ctx; - h2_stream *stream = get_stream(session, stream_id); apr_status_t status = APR_SUCCESS; - - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_stream(%ld-%d): on_resume", session->id, stream_id); - if (stream) { - int rv; - if (stream->rst_error) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03466) - "h2_stream(%ld-%d): RST_STREAM, err=%d", - session->id, stream->id, stream->rst_error); - rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, - stream->id, stream->rst_error); - } - else { - rv = nghttp2_session_resume_data(session->ngh2, stream_id); - } - session->have_written = 1; - ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)? - APLOG_ERR : APLOG_DEBUG, 0, session->c, - APLOGNO(02936) - "h2_stream(%ld-%d): resuming %s", - session->id, stream->id, rv? nghttp2_strerror(rv) : ""); - } - return status; -} - -/** - * A response for the stream is ready. - */ -static apr_status_t on_stream_response(void *ctx, int stream_id) -{ - h2_session *session = ctx; - h2_stream *stream = get_stream(session, stream_id); - apr_status_t status = APR_SUCCESS; - h2_response *response; int rv = 0; - AP_DEBUG_ASSERT(session); + ap_assert(session); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_stream(%ld-%d): on_response", session->id, stream_id); - if (!stream) { - return APR_NOTFOUND; - } - else if (!stream->response) { + "h2_stream(%ld-%d): on_headers", session->id, stream->id); + if (!headers) { int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR); - - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074) + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03466) "h2_stream(%ld-%d): RST_STREAM, err=%d", session->id, stream->id, err); - rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, stream->id, err); goto leave; } - - while ((response = h2_stream_get_unsent_response(stream)) != NULL) { + else if (headers->status < 100) { + rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, + stream->id, headers->status); + goto leave; + } + else if (stream->has_response) { + h2_ngheader *nh; + int rv; + + nh = h2_util_ngheader_make(stream->pool, headers->headers); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03072) + "h2_stream(%ld-%d): submit %d trailers", + session->id, (int)stream->id,(int) nh->nvlen); + rv = nghttp2_submit_trailer(session->ngh2, stream->id, nh->nv, nh->nvlen); + goto leave; + } + else { nghttp2_data_provider provider, *pprovider = NULL; h2_ngheader *ngh; + apr_table_t *hout; const h2_priority *prio; - - if (stream->submitted) { - rv = NGHTTP2_PROTOCOL_ERROR; - goto leave; - } + const char *note; ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073) "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u", - session->id, stream->id, response->http_status, + session->id, stream->id, headers->status, (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id)); - if (response->content_length != 0) { + if (!eos || len > 0) { memset(&provider, 0, sizeof(provider)); provider.source.fd = stream->id; provider.read_callback = stream_data_cb; @@ -1522,23 +1480,36 @@ static apr_status_t on_stream_response(void *ctx, int stream_id) * also have the pushed ones as well. */ if (!stream->initiated_on - && h2_response_is_final(response) - && H2_HTTP_2XX(response->http_status) + && h2_headers_are_response(headers) + && H2_HTTP_2XX(headers->status) && h2_session_push_enabled(session)) { - h2_stream_submit_pushes(stream); + h2_stream_submit_pushes(stream, headers); } - prio = h2_stream_get_priority(stream); + prio = h2_stream_get_priority(stream, headers); if (prio) { h2_session_set_prio(session, stream, prio); } - ngh = h2_util_ngheader_make_res(stream->pool, response->http_status, - response->headers); - rv = nghttp2_submit_response(session->ngh2, response->stream_id, + hout = headers->headers; + note = apr_table_get(headers->notes, H2_FILTER_DEBUG_NOTE); + if (note && !strcmp("on", note)) { + int32_t connFlowIn, connFlowOut; + + connFlowIn = nghttp2_session_get_effective_local_window_size(session->ngh2); + connFlowOut = nghttp2_session_get_remote_window_size(session->ngh2); + hout = apr_table_clone(stream->pool, hout); + apr_table_setn(hout, "conn-flow-in", + apr_itoa(stream->pool, connFlowIn)); + apr_table_setn(hout, "conn-flow-out", + apr_itoa(stream->pool, connFlowOut)); + } + + ngh = h2_util_ngheader_make_res(stream->pool, headers->status, hout); + rv = nghttp2_submit_response(session->ngh2, stream->id, ngh->nv, ngh->nvlen, pprovider); - stream->submitted = h2_response_is_final(response); + stream->has_response = h2_headers_are_response(headers); session->have_written = 1; if (stream->initiated_on) { @@ -1574,6 +1545,48 @@ leave: return status; } +/** + * A stream was resumed as new output data arrived. + */ +static apr_status_t on_stream_resume(void *ctx, int stream_id) +{ + h2_session *session = ctx; + h2_stream *stream = get_stream(session, stream_id); + apr_status_t status = APR_EAGAIN; + int rv; + + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "h2_stream(%ld-%d): on_resume", session->id, stream_id); + if (stream) { + apr_off_t len = 0; + int eos = 0; + h2_headers *headers = NULL; + + send_headers: + status = h2_stream_out_prepare(stream, &len, &eos, &headers); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, + "h2_stream(%ld-%d): prepared len=%ld, eos=%d", + session->id, stream_id, (long)len, eos); + if (headers) { + status = on_stream_headers(session, stream, headers, len, eos); + if (status != APR_SUCCESS) { + return status; + } + goto send_headers; + } + else if (status != APR_EAGAIN) { + rv = nghttp2_session_resume_data(session->ngh2, stream_id); + session->have_written = 1; + ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)? + APLOG_ERR : APLOG_DEBUG, 0, session->c, + APLOGNO(02936) + "h2_stream(%ld-%d): resuming %s", + session->id, stream->id, rv? nghttp2_strerror(rv) : ""); + } + } + return status; +} + static apr_status_t h2_session_receive(void *ctx, const char *data, apr_size_t len, apr_size_t *readlen) { @@ -1664,40 +1677,6 @@ static apr_status_t h2_session_read(h2_session *session, int block) return rstatus; } -static int unsubmitted_iter(void *ctx, void *val) -{ - h2_stream *stream = val; - if (h2_stream_needs_submit(stream)) { - *((int *)ctx) = 1; - return 0; - } - return 1; -} - -static int has_unsubmitted_streams(h2_session *session) -{ - int has_unsubmitted = 0; - h2_ihash_iter(session->streams, unsubmitted_iter, &has_unsubmitted); - return has_unsubmitted; -} - -static int suspended_iter(void *ctx, void *val) -{ - h2_stream *stream = val; - if (h2_stream_is_suspended(stream)) { - *((int *)ctx) = 1; - return 0; - } - return 1; -} - -static int has_suspended_streams(h2_session *session) -{ - int has_suspended = 0; - h2_ihash_iter(session->streams, suspended_iter, &has_suspended); - return has_suspended; -} - static const char *StateNames[] = { "INIT", /* H2_SESSION_ST_INIT */ "DONE", /* H2_SESSION_ST_DONE */ @@ -1842,8 +1821,7 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg) session->id, session->open_streams); h2_conn_io_flush(&session->io); if (session->open_streams > 0) { - if (has_unsubmitted_streams(session) - || has_suspended_streams(session)) { + if (h2_mplx_awaits_data(session->mplx)) { /* waiting for at least one stream to produce data */ transit(session, "no io", H2_SESSION_ST_WAIT); } @@ -2207,7 +2185,6 @@ apr_status_t h2_session_process(h2_session *session, int async) /* trigger window updates, stream resumes and submits */ status = h2_mplx_dispatch_master_events(session->mplx, on_stream_resume, - on_stream_response, session); if (status != APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c, diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index e905a390a2..4d21cb86f5 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -49,7 +49,6 @@ struct h2_mplx; struct h2_priority; struct h2_push; struct h2_push_diary; -struct h2_response; struct h2_session; struct h2_stream; struct h2_task; @@ -187,11 +186,6 @@ void h2_session_abort(h2_session *session, apr_status_t reason); */ void h2_session_close(h2_session *session); -/* Start submitting the response to a stream request. This is possible - * once we have all the response headers. */ -apr_status_t h2_session_handle_response(h2_session *session, - struct h2_stream *stream); - /** * Create and register a new stream under the given id. * diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 7720237b10..35747710cd 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -16,6 +16,8 @@ #include <assert.h> #include <stddef.h> +#include <apr_strings.h> + #include <httpd.h> #include <http_core.h> #include <http_connection.h> @@ -29,11 +31,10 @@ #include "h2_conn.h" #include "h2_config.h" #include "h2_h2.h" -#include "h2_filter.h" #include "h2_mplx.h" #include "h2_push.h" #include "h2_request.h" -#include "h2_response.h" +#include "h2_headers.h" #include "h2_session.h" #include "h2_stream.h" #include "h2_task.h" @@ -62,8 +63,8 @@ static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, char *tag) apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); len = h2_util_bb_print(buffer, bmax, tag, "", s->buffer); - ap_log_cerror(APLOG_MARK, lvl, 0, c, "bb_dump(%ld-%d): %s", - c->id, s->id, len? buffer : line); + ap_log_cerror(APLOG_MARK, lvl, 0, c, "bb_dump(%s): %s", + c->log_id, len? buffer : line); } } @@ -150,6 +151,23 @@ static int output_open(h2_stream *stream) } } +static void prep_output(h2_stream *stream) { + conn_rec *c = stream->session->c; + if (!stream->buffer) { + stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc); + } +} + +static void prepend_response(h2_stream *stream, h2_headers *response) +{ + conn_rec *c = stream->session->c; + apr_bucket *b; + + prep_output(stream); + b = h2_bucket_headers_create(c->bucket_alloc, response); + APR_BRIGADE_INSERT_HEAD(stream->buffer, b); +} + static apr_status_t stream_pool_cleanup(void *ctx) { h2_stream *stream = ctx; @@ -252,21 +270,6 @@ void h2_stream_rst(h2_stream *stream, int error_code) stream->session->id, stream->id, error_code); } -struct h2_response *h2_stream_get_response(h2_stream *stream) -{ - return stream->response; -} - -struct h2_response *h2_stream_get_unsent_response(h2_stream *stream) -{ - h2_response *unsent = (stream->last_sent? - stream->last_sent->next : stream->response); - if (unsent) { - stream->last_sent = unsent; - } - return unsent; -} - apr_status_t h2_stream_set_request_rec(h2_stream *stream, request_rec *r) { h2_request *req; @@ -277,8 +280,7 @@ apr_status_t h2_stream_set_request_rec(h2_stream *stream, request_rec *r) if (stream->rst_error) { return APR_ECONNRESET; } - status = h2_request_rcreate(&req, stream->pool, stream->id, - stream->initiated_on, r); + status = h2_request_rcreate(&req, stream->pool, r); ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058) "h2_request(%d): set_request_rec %s host=%s://%s%s", stream->id, req->method, req->scheme, req->authority, @@ -295,13 +297,40 @@ apr_status_t h2_stream_set_request(h2_stream *stream, const h2_request *r) return APR_SUCCESS; } +static apr_status_t add_trailer(h2_stream *stream, + const char *name, size_t nlen, + const char *value, size_t vlen) +{ + conn_rec *c = stream->session->c; + char *hname, *hvalue; + + if (nlen == 0 || name[0] == ':') { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, c, APLOGNO(03060) + "h2_request(%ld-%d): pseudo header in trailer", + c->id, stream->id); + return APR_EINVAL; + } + if (h2_req_ignore_trailer(name, nlen)) { + return APR_SUCCESS; + } + if (!stream->trailers) { + stream->trailers = apr_table_make(stream->pool, 5); + } + hname = apr_pstrndup(stream->pool, name, nlen); + hvalue = apr_pstrndup(stream->pool, value, vlen); + h2_util_camel_case_header(hname, nlen); + apr_table_mergen(stream->trailers, hname, hvalue); + + return APR_SUCCESS; +} + apr_status_t h2_stream_add_header(h2_stream *stream, const char *name, size_t nlen, const char *value, size_t vlen) { AP_DEBUG_ASSERT(stream); - if (!stream->response) { + if (!stream->has_response) { if (name[0] == ':') { if ((vlen) > stream->session->s->limit_req_line) { /* pseudo header: approximation of request line size check */ @@ -336,10 +365,7 @@ apr_status_t h2_stream_add_header(h2_stream *stream, } if (h2_stream_is_scheduled(stream)) { - /* FIXME: this is not clean. we modify a struct that is being processed - * by another thread potentially. */ - return h2_request_add_trailer((h2_request*)stream->request, stream->pool, - name, nlen, value, vlen); + return add_trailer(stream, name, nlen, value, vlen); } else { if (!stream->rtmp) { @@ -366,36 +392,38 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled, if (eos) { close_input(stream); } + + if (!stream->input) { + h2_beam_create(&stream->input, stream->pool, stream->id, "input", 0); + } - if (stream->response) { + if (h2_stream_is_ready(stream)) { /* already have a resonse, probably a HTTP error code */ return h2_mplx_process(stream->session->mplx, stream, cmp, ctx); } else if (!stream->request && stream->rtmp) { /* This is the common case: a h2_request was being assembled, now * it gets finalized and checked for completness */ - status = h2_request_end_headers(stream->rtmp, stream->pool, - eos, push_enabled); + status = h2_request_end_headers(stream->rtmp, stream->pool, eos); if (status == APR_SUCCESS) { - stream->rtmp->id = stream->id; - stream->rtmp->initiated_on = stream->initiated_on; stream->rtmp->serialize = h2_config_geti(stream->session->config, H2_CONF_SER_HEADERS); stream->request = stream->rtmp; stream->rtmp = NULL; stream->scheduled = 1; - stream->input_remaining = stream->request->content_length; + stream->push_policy = h2_push_policy_determine(stream->request->headers, + stream->pool, push_enabled); + status = h2_mplx_process(stream->session->mplx, stream, cmp, ctx); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, "h2_stream(%ld-%d): scheduled %s %s://%s%s " - "clen=%ld, body=%d, chunked=%d", + "chunked=%d", stream->session->id, stream->id, stream->request->method, stream->request->scheme, stream->request->authority, stream->request->path, - (long)stream->request->content_length, - stream->request->body, stream->request->chunked); + stream->request->chunked); return status; } } @@ -420,21 +448,36 @@ int h2_stream_is_scheduled(const h2_stream *stream) apr_status_t h2_stream_close_input(h2_stream *stream) { + conn_rec *c = stream->session->c; apr_status_t status = APR_SUCCESS; - - AP_DEBUG_ASSERT(stream); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, "h2_stream(%ld-%d): closing input", stream->session->id, stream->id); - if (stream->rst_error) { return APR_ECONNRESET; } - if (close_input(stream) && stream->input) { - status = h2_beam_close(stream->input); + if (!stream->input) { + h2_beam_create(&stream->input, stream->pool, stream->id, "input", 0); } - return status; + + if (stream->trailers && !apr_is_empty_table(stream->trailers)) { + h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers, + NULL, stream->pool); + apr_bucket *b = h2_bucket_headers_create(c->bucket_alloc, r); + apr_bucket_brigade *tmp; + + tmp = apr_brigade_create(stream->pool, c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(tmp, b); + status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); + apr_brigade_destroy(tmp); + + stream->trailers = NULL; + } + + close_input(stream); + return h2_beam_close(stream->input); } apr_status_t h2_stream_write_data(h2_stream *stream, @@ -459,52 +502,22 @@ apr_status_t h2_stream_write_data(h2_stream *stream, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_stream(%ld-%d): add %ld input bytes", stream->session->id, stream->id, (long)len); - - if (!stream->request->chunked) { - stream->input_remaining -= len; - if (stream->input_remaining < 0) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c, - APLOGNO(02961) - "h2_stream(%ld-%d): got %ld more content bytes than announced " - "in content-length header: %ld", - stream->session->id, stream->id, - (long)stream->request->content_length, - -(long)stream->input_remaining); - h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR); - return APR_ECONNABORTED; - } - } tmp = apr_brigade_create(stream->pool, c->bucket_alloc); apr_brigade_write(tmp, NULL, NULL, data, len); - if (eos) { - APR_BRIGADE_INSERT_TAIL(tmp, apr_bucket_eos_create(c->bucket_alloc)); - close_input(stream); - } status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); apr_brigade_destroy(tmp); stream->in_data_frames++; stream->in_data_octets += len; + if (eos) { + return h2_stream_close_input(stream); + } + return status; } -void h2_stream_set_suspended(h2_stream *stream, int suspended) -{ - AP_DEBUG_ASSERT(stream); - stream->suspended = !!suspended; - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, - "h2_stream(%ld-%d): suspended=%d", - stream->session->id, stream->id, stream->suspended); -} - -int h2_stream_is_suspended(const h2_stream *stream) -{ - AP_DEBUG_ASSERT(stream); - return stream->suspended; -} - static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount) { conn_rec *c = stream->session->c; @@ -549,89 +562,64 @@ static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount) return status; } -apr_status_t h2_stream_add_response(h2_stream *stream, h2_response *response, - h2_bucket_beam *output) -{ - apr_status_t status = APR_SUCCESS; - conn_rec *c = stream->session->c; - h2_response **pr = &stream->response; - - if (!output_open(stream)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_stream(%ld-%d): output closed", - stream->session->id, stream->id); - return APR_ECONNRESET; - } - if (stream->submitted) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_stream(%ld-%d): already submitted final response", - stream->session->id, stream->id); - return APR_ECONNRESET; - } - - /* append */ - while (*pr) { - pr = &((*pr)->next); - } - *pr = response; - - if (h2_response_is_final(response)) { - stream->output = output; - stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc); - - h2_stream_filter(stream); - if (stream->output) { - status = fill_buffer(stream, 0); - } - } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, - "h2_stream(%ld-%d): set_response(%d)", - stream->session->id, stream->id, - stream->response->http_status); - return status; -} - apr_status_t h2_stream_set_error(h2_stream *stream, int http_status) { - h2_response *response; + h2_headers *response; - if (stream->submitted) { + if (h2_stream_is_ready(stream)) { return APR_EINVAL; } if (stream->rtmp) { stream->request = stream->rtmp; stream->rtmp = NULL; } - response = h2_response_die(stream->id, http_status, - stream->request, stream->pool); - return h2_stream_add_response(stream, response, NULL); + response = h2_headers_die(http_status, stream->request, stream->pool); + prepend_response(stream, response); + return APR_SUCCESS; } -static const apr_size_t DATA_CHUNK_SIZE = ((16*1024) - 100 - 9); +static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb) +{ + if (bb) { + apr_bucket *b = APR_BRIGADE_FIRST(bb); + while (b != APR_BRIGADE_SENTINEL(bb)) { + if (H2_BUCKET_IS_HEADERS(b)) { + return b; + } + b = APR_BUCKET_NEXT(b); + } + } + return NULL; +} -apr_status_t h2_stream_out_prepare(h2_stream *stream, - apr_off_t *plen, int *peos) +apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, + int *peos, h2_headers **presponse) { conn_rec *c = stream->session->c; apr_status_t status = APR_SUCCESS; apr_off_t requested; + apr_bucket *b, *e; + if (presponse) { + *presponse = NULL; + } + if (stream->rst_error) { *plen = 0; *peos = 1; return APR_ECONNRESET; } - - if (!stream->buffer) { - return APR_EAGAIN; - } + if (!output_open(stream)) { + return APR_ECONNRESET; + } + prep_output(stream); + if (*plen > 0) { - requested = H2MIN(*plen, DATA_CHUNK_SIZE); + requested = H2MIN(*plen, H2_DATA_CHUNK_SIZE); } else { - requested = DATA_CHUNK_SIZE; + requested = H2_DATA_CHUNK_SIZE; } *plen = requested; @@ -639,7 +627,7 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, h2_util_bb_avail(stream->buffer, plen, peos); if (!*peos && *plen < requested) { /* try to get more data */ - status = fill_buffer(stream, (requested - *plen) + DATA_CHUNK_SIZE); + status = fill_buffer(stream, (requested - *plen) + H2_DATA_CHUNK_SIZE); if (APR_STATUS_IS_EOF(status)) { apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc); APR_BRIGADE_INSERT_TAIL(stream->buffer, eos); @@ -653,17 +641,66 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, h2_util_bb_avail(stream->buffer, plen, peos); } H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_post"); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, - "h2_stream(%ld-%d): prepare, len=%ld eos=%d, trailers=%s", - c->id, stream->id, (long)*plen, *peos, - (stream->response && stream->response->trailers)? - "yes" : "no"); - if (!*peos && !*plen && status == APR_SUCCESS) { - return APR_EAGAIN; + + b = APR_BRIGADE_FIRST(stream->buffer); + while (b != APR_BRIGADE_SENTINEL(stream->buffer)) { + e = APR_BUCKET_NEXT(b); + if (APR_BUCKET_IS_FLUSH(b)) { + APR_BUCKET_REMOVE(b); + apr_bucket_destroy(b); + } + else { + break; + } + b = e; } + + b = get_first_headers_bucket(stream->buffer); + if (b) { + /* there are HEADERS to submit */ + *peos = 0; + *plen = 0; + if (b == APR_BRIGADE_FIRST(stream->buffer)) { + if (presponse) { + *presponse = h2_bucket_headers_get(b); + APR_BUCKET_REMOVE(b); + apr_bucket_destroy(b); + status = APR_SUCCESS; + } + else { + /* someone needs to retrieve the response first */ + h2_mplx_keep_active(stream->session->mplx, stream->id); + status = APR_EAGAIN; + } + } + else { + apr_bucket *e = APR_BRIGADE_FIRST(stream->buffer); + while (e != APR_BRIGADE_SENTINEL(stream->buffer)) { + if (e == b) { + break; + } + else if (e->length != (apr_size_t)-1) { + *plen += e->length; + } + e = APR_BUCKET_NEXT(e); + } + } + } + + if (!*peos && !*plen && status == APR_SUCCESS + && (!presponse || !*presponse)) { + status = APR_EAGAIN; + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, + "h2_stream(%ld-%d): prepare, len=%ld eos=%d", + c->id, stream->id, (long)*plen, *peos); return status; } +static int is_not_headers(apr_bucket *b) +{ + return !H2_BUCKET_IS_HEADERS(b); +} apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, apr_off_t *plen, int *peos) @@ -674,7 +711,7 @@ apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, if (stream->rst_error) { return APR_ECONNRESET; } - status = h2_append_brigade(bb, stream->buffer, plen, peos); + status = h2_append_brigade(bb, stream->buffer, plen, peos, is_not_headers); if (status == APR_SUCCESS && !*peos && !*plen) { status = APR_EAGAIN; } @@ -690,27 +727,13 @@ int h2_stream_input_is_open(const h2_stream *stream) return input_open(stream); } -int h2_stream_needs_submit(const h2_stream *stream) -{ - switch (stream->state) { - case H2_STREAM_ST_OPEN: - case H2_STREAM_ST_CLOSED_INPUT: - case H2_STREAM_ST_CLOSED_OUTPUT: - case H2_STREAM_ST_CLOSED: - return !stream->submitted; - default: - return 0; - } -} - -apr_status_t h2_stream_submit_pushes(h2_stream *stream) +apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response) { apr_status_t status = APR_SUCCESS; apr_array_header_t *pushes; int i; - pushes = h2_push_collect_update(stream, stream->request, - stream->response); + pushes = h2_push_collect_update(stream, stream->request, response); if (pushes && !apr_is_empty_array(pushes)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, "h2_stream(%ld-%d): found %d push candidates", @@ -729,13 +752,14 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream) apr_table_t *h2_stream_get_trailers(h2_stream *stream) { - return stream->response? stream->response->trailers : NULL; + return NULL; } -const h2_priority *h2_stream_get_priority(h2_stream *stream) +const h2_priority *h2_stream_get_priority(h2_stream *stream, + h2_headers *response) { - if (stream->response && stream->initiated_on) { - const char *ctype = apr_table_get(stream->response->headers, "content-type"); + if (response && stream->initiated_on) { + const char *ctype = apr_table_get(response->headers, "content-type"); if (ctype) { /* FIXME: Not good enough, config needs to come from request->server */ return h2_config_get_priority(stream->session->config, ctype); @@ -767,3 +791,15 @@ const char *h2_stream_state_str(h2_stream *stream) } } +int h2_stream_is_ready(h2_stream *stream) +{ + if (stream->has_response) { + return 1; + } + else if (stream->buffer && get_first_headers_bucket(stream->buffer)) { + return 1; + } + return 0; +} + + diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index e871cb7ee2..346e4e2a22 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -25,16 +25,16 @@ * connection to the client. The h2_session writes to the h2_stream, * adding HEADERS and DATA and finally an EOS. When headers are done, * h2_stream is scheduled for handling, which is expected to produce - * a h2_response. + * a h2_headers. * - * The h2_response gives the HEADER frames to sent to the client, followed + * The h2_headers gives the HEADER frames to sent to the client, followed * by DATA frames read from the h2_stream until EOS is reached. */ struct h2_mplx; struct h2_priority; struct h2_request; -struct h2_response; +struct h2_headers; struct h2_session; struct h2_sos; struct h2_bucket_beam; @@ -51,27 +51,27 @@ struct h2_stream { apr_pool_t *pool; /* the memory pool for this stream */ const struct h2_request *request; /* the request made in this stream */ struct h2_request *rtmp; /* request being assembled */ + apr_table_t *trailers; /* optional incoming trailers */ struct h2_bucket_beam *input; int request_headers_added; /* number of request headers added */ + unsigned int push_policy; /* which push policy to use for this request */ - struct h2_response *response; - struct h2_response *last_sent; struct h2_bucket_beam *output; apr_bucket_brigade *buffer; apr_array_header_t *files; /* apr_file_t* we collected during I/O */ int rst_error; /* stream error for RST_STREAM */ unsigned int aborted : 1; /* was aborted */ - unsigned int suspended : 1; /* DATA sending has been suspended */ unsigned int scheduled : 1; /* stream has been scheduled */ unsigned int started : 1; /* stream has started processing */ - unsigned int submitted : 1; /* response HEADER has been sent */ + unsigned int has_response : 1; /* response headers are known */ - apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */ apr_off_t out_data_frames; /* # of DATA frames sent */ apr_off_t out_data_octets; /* # of DATA octets (payload) sent */ apr_off_t in_data_frames; /* # of DATA frames received */ apr_off_t in_data_octets; /* # of DATA octets (payload) received */ + + const char *sos_filter; }; @@ -188,22 +188,6 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled, */ int h2_stream_is_scheduled(const h2_stream *stream); -struct h2_response *h2_stream_get_response(h2_stream *stream); -struct h2_response *h2_stream_get_unsent_response(h2_stream *stream); - -/** - * Set the response for this stream. Invoked when all meta data for - * the stream response has been collected. - * - * @param stream the stream to set the response for - * @param response the response data for the stream - * @param bb bucket brigade with output data for the stream. Optional, - * may be incomplete. - */ -apr_status_t h2_stream_add_response(h2_stream *stream, - struct h2_response *response, - struct h2_bucket_beam *output); - /** * Set the HTTP error status as response. */ @@ -218,12 +202,13 @@ apr_status_t h2_stream_set_error(h2_stream *stream, int http_status); * may be read without blocking * @param peos (out) != 0 iff end of stream will be reached when reading plen * bytes (out value). + * @param presponse (out) the response of one became available * @return APR_SUCCESS if out information was computed successfully. * APR_EAGAIN if not data is available and end of stream has not been * reached yet. */ -apr_status_t h2_stream_out_prepare(h2_stream *stream, - apr_off_t *plen, int *peos); +apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, + int *peos, h2_headers **presponse); /** * Read a maximum number of bytes into the bucket brigade. @@ -251,20 +236,6 @@ apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, apr_table_t *h2_stream_get_trailers(h2_stream *stream); /** - * Set the suspended state of the stream. - * @param stream the stream to change state on - * @param suspended boolean value if stream is suspended - */ -void h2_stream_set_suspended(h2_stream *stream, int suspended); - -/** - * Check if the stream has been suspended. - * @param stream the stream to check - * @return != 0 iff stream is suspended. - */ -int h2_stream_is_suspended(const h2_stream *stream); - -/** * Check if the stream has open input. * @param stream the stream to check * @return != 0 iff stream has open input. @@ -272,24 +243,18 @@ int h2_stream_is_suspended(const h2_stream *stream); int h2_stream_input_is_open(const h2_stream *stream); /** - * Check if the stream has not submitted a response or RST yet. - * @param stream the stream to check - * @return != 0 iff stream has not submitted a response or RST. - */ -int h2_stream_needs_submit(const h2_stream *stream); - -/** * Submit any server push promises on this stream and schedule * the tasks connection with these. * * @param stream the stream for which to submit */ -apr_status_t h2_stream_submit_pushes(h2_stream *stream); +apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response); /** * Get priority information set for this stream. */ -const struct h2_priority *h2_stream_get_priority(h2_stream *stream); +const struct h2_priority *h2_stream_get_priority(h2_stream *stream, + h2_headers *response); /** * Return a textual representation of the stream state as in RFC 7540 @@ -297,4 +262,10 @@ const struct h2_priority *h2_stream_get_priority(h2_stream *stream); */ const char *h2_stream_state_str(h2_stream *stream); +/** + * Determine if stream is ready for submitting a response or a RST + * @param stream the stream to check + */ +int h2_stream_is_ready(h2_stream *stream); + #endif /* defined(__mod_h2__h2_stream__) */ diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 87ab619acc..773b3768cd 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -42,13 +42,27 @@ #include "h2_h2.h" #include "h2_mplx.h" #include "h2_request.h" -#include "h2_response.h" +#include "h2_headers.h" #include "h2_session.h" #include "h2_stream.h" #include "h2_task.h" #include "h2_worker.h" #include "h2_util.h" +static void H2_TASK_OUT_LOG(int lvl, h2_task *task, apr_bucket_brigade *bb, char *tag) +{ + if (APLOG_C_IS_LEVEL(task->c, lvl)) { + conn_rec *c = task->c; + char buffer[4 * 1024]; + const char *line = "(null)"; + apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); + + len = h2_util_bb_print(buffer, bmax, tag, "", bb); + ap_log_cerror(APLOG_MARK, lvl, 0, c, "bb_dump(%s): %s", + task->id, len? buffer : line); + } +} + /******************************************************************************* * task input handling ******************************************************************************/ @@ -60,90 +74,13 @@ static int input_ser_header(void *ctx, const char *name, const char *value) return 1; } -static void make_chunk(h2_task *task, apr_bucket_brigade *bb, - apr_bucket *first, apr_uint64_t chunk_len, - apr_bucket *tail) -{ - /* Surround the buckets [first, tail[ with new buckets carrying the - * HTTP/1.1 chunked encoding format. If tail is NULL, the chunk extends - * to the end of the brigade. */ - char buffer[128]; - apr_bucket *c; - int len; - - len = apr_snprintf(buffer, H2_ALEN(buffer), - "%"APR_UINT64_T_HEX_FMT"\r\n", chunk_len); - c = apr_bucket_heap_create(buffer, len, NULL, bb->bucket_alloc); - APR_BUCKET_INSERT_BEFORE(first, c); - c = apr_bucket_heap_create("\r\n", 2, NULL, bb->bucket_alloc); - if (tail) { - APR_BUCKET_INSERT_BEFORE(tail, c); - } - else { - APR_BRIGADE_INSERT_TAIL(bb, c); - } -} - -static apr_status_t input_handle_eos(h2_task *task, request_rec *r, - apr_bucket *b) -{ - apr_status_t status = APR_SUCCESS; - apr_bucket_brigade *bb = task->input.bb; - apr_table_t *t = task->request->trailers; - - if (task->input.chunked) { - apr_bucket_brigade *tmp = apr_brigade_split_ex(bb, b, NULL); - if (t && !apr_is_empty_table(t)) { - status = apr_brigade_puts(bb, NULL, NULL, "0\r\n"); - apr_table_do(input_ser_header, task, t, NULL); - status = apr_brigade_puts(bb, NULL, NULL, "\r\n"); - } - else { - status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n"); - } - APR_BRIGADE_CONCAT(bb, tmp); - apr_brigade_destroy(tmp); - } - else if (r && t && !apr_is_empty_table(t)){ - /* trailers passed in directly. */ - apr_table_overlap(r->trailers_in, t, APR_OVERLAP_TABLES_SET); - } - task->input.eos_written = 1; - return status; -} - -static apr_status_t input_append_eos(h2_task *task, request_rec *r) -{ - apr_status_t status = APR_SUCCESS; - apr_bucket_brigade *bb = task->input.bb; - apr_table_t *t = task->request->trailers; - - if (task->input.chunked) { - if (t && !apr_is_empty_table(t)) { - status = apr_brigade_puts(bb, NULL, NULL, "0\r\n"); - apr_table_do(input_ser_header, task, t, NULL); - status = apr_brigade_puts(bb, NULL, NULL, "\r\n"); - } - else { - status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n"); - } - } - else if (r && t && !apr_is_empty_table(t)){ - /* trailers passed in directly. */ - apr_table_overlap(r->trailers_in, t, APR_OVERLAP_TABLES_SET); - } - APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(bb->bucket_alloc)); - task->input.eos_written = 1; - return status; -} - static apr_status_t input_read(h2_task *task, ap_filter_t* f, apr_bucket_brigade* bb, ap_input_mode_t mode, apr_read_type_e block, apr_off_t readbytes) { apr_status_t status = APR_SUCCESS; - apr_bucket *b, *next, *first_data; - apr_off_t bblen = 0; + apr_bucket *b, *next; + apr_off_t bblen; apr_size_t rmax = ((readbytes <= APR_SIZE_MAX)? (apr_size_t)readbytes : APR_SIZE_MAX); @@ -160,28 +97,9 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f, } if (!task->input.bb) { - if (!task->input.eos_written) { - input_append_eos(task, f->r); - return APR_SUCCESS; - } return APR_EOF; } - /* - if (f->r && f->r->expecting_100) { - ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, f->c, - "h2_task(%s): need to send 100 Continue here", - task->id); - f->r->expecting_100 = 0; - } - if (task->r && task->r->expecting_100) { - ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, f->c, - "h2_task2(%s): need to send 100 Continue here", - task->id); - task->r->expecting_100 = 0; - } - */ - /* Cleanup brigades from those nasty 0 length non-meta buckets * that apr_brigade_split_line() sometimes produces. */ for (b = APR_BRIGADE_FIRST(task->input.bb); @@ -192,12 +110,11 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f, } } - while (APR_BRIGADE_EMPTY(task->input.bb) && !task->input.eos) { + while (APR_BRIGADE_EMPTY(task->input.bb)) { /* Get more input data for our request. */ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, "h2_task(%s): get more data from mplx, block=%d, " - "readbytes=%ld, queued=%ld", - task->id, block, (long)readbytes, (long)bblen); + "readbytes=%ld", task->id, block, (long)readbytes); /* Override the block mode we get called with depending on the input's * setting. */ @@ -219,7 +136,7 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f, status = APR_SUCCESS; } else if (APR_STATUS_IS_EOF(status)) { - task->input.eos = 1; + break; } else if (status != APR_SUCCESS) { return status; @@ -229,51 +146,14 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f, * chunked encoding if necessary */ h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, "input.beam recv raw", task->input.bb); - first_data = NULL; - bblen = 0; - for (b = APR_BRIGADE_FIRST(task->input.bb); - b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) { - next = APR_BUCKET_NEXT(b); - if (APR_BUCKET_IS_METADATA(b)) { - if (first_data && task->input.chunked) { - make_chunk(task, task->input.bb, first_data, bblen, b); - first_data = NULL; - bblen = 0; - } - if (APR_BUCKET_IS_EOS(b)) { - task->input.eos = 1; - input_handle_eos(task, f->r, b); - h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, - "input.bb after handle eos", - task->input.bb); - } - } - else if (b->length == 0) { - apr_bucket_delete(b); - } - else { - if (!first_data) { - first_data = b; - } - bblen += b->length; - } - } - if (first_data && task->input.chunked) { - make_chunk(task, task->input.bb, first_data, bblen, NULL); - } - if (h2_task_logio_add_bytes_in) { + apr_brigade_length(bb, 0, &bblen); h2_task_logio_add_bytes_in(f->c, bblen); } } - if (task->input.eos) { - if (!task->input.eos_written) { - input_append_eos(task, f->r); - } - if (APR_BRIGADE_EMPTY(task->input.bb)) { - return APR_EOF; - } + if (status == APR_EOF && APR_BRIGADE_EMPTY(task->input.bb)) { + return APR_EOF; } h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, @@ -333,32 +213,15 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f, * task output handling ******************************************************************************/ -static apr_status_t open_response(h2_task *task, h2_response *response) +static apr_status_t open_output(h2_task *task) { - if (!response) { - /* This happens currently when ap_die(status, r) is invoked - * by a read request filter. */ - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, APLOGNO(03204) - "h2_task(%s): write without response for %s %s %s", - task->id, - task->request->method, - task->request->authority, - task->request->path); - task->c->aborted = 1; - return APR_ECONNABORTED; - } - - if (h2_task_logio_add_bytes_out) { - /* count headers as if we'd do a HTTP/1.1 serialization */ - task->output.written = h2_util_table_bytes(response->headers, 3)+1; - h2_task_logio_add_bytes_out(task->c, task->output.written); - } ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, APLOGNO(03348) - "h2_task(%s): open response to %s %s %s", + "h2_task(%s): open output to %s %s %s", task->id, task->request->method, task->request->authority, task->request->path); - return h2_mplx_out_open(task->mplx, task->stream_id, response); + task->output.opened = 1; + return h2_mplx_out_open(task->mplx, task->stream_id, task->output.beam); } static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb) @@ -367,23 +230,22 @@ static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb) apr_status_t status; apr_brigade_length(bb, 0, &written); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, - "h2_task(%s): write response body (%ld bytes)", - task->id, (long)written); - + H2_TASK_OUT_LOG(APLOG_TRACE2, task, bb, "h2_task send_out"); + /* engines send unblocking */ status = h2_beam_send(task->output.beam, bb, - task->blocking? APR_BLOCK_READ - : APR_NONBLOCK_READ); + task->assigned? APR_NONBLOCK_READ + : APR_BLOCK_READ); if (APR_STATUS_IS_EAGAIN(status)) { apr_brigade_length(bb, 0, &left); written -= left; status = APR_SUCCESS; } if (status == APR_SUCCESS) { - task->output.written += written; if (h2_task_logio_add_bytes_out) { h2_task_logio_add_bytes_out(task->c, written); } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, task->c, + "h2_task(%s): send_out done", task->id); } else { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, @@ -397,8 +259,8 @@ static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb) * request_rec out filter chain) into the h2_mplx for further sending * on the master connection. */ -static apr_status_t output_write(h2_task *task, ap_filter_t* f, - apr_bucket_brigade* bb) +static apr_status_t slave_out(h2_task *task, ap_filter_t* f, + apr_bucket_brigade* bb) { apr_bucket *b; apr_status_t status = APR_SUCCESS; @@ -448,7 +310,7 @@ static apr_status_t output_write(h2_task *task, ap_filter_t* f, if ((!task->output.bb || APR_BRIGADE_EMPTY(task->output.bb)) && !APR_BRIGADE_EMPTY(bb)) { /* check if we have a flush before the end-of-request */ - if (!task->output.response_open) { + if (!task->output.opened) { for (b = APR_BRIGADE_FIRST(bb); b != APR_BRIGADE_SENTINEL(bb); b = APR_BUCKET_NEXT(b)) { @@ -476,42 +338,40 @@ static apr_status_t output_write(h2_task *task, ap_filter_t* f, task->output.bb = apr_brigade_create(task->pool, task->c->bucket_alloc); } - return ap_save_brigade(f, &task->output.bb, &bb, task->pool); + status = ap_save_brigade(f, &task->output.bb, &bb, task->pool); + if (status != APR_SUCCESS) { + return status; + } } - if (!task->output.response_open + if (!task->output.opened && (flush || h2_beam_get_mem_used(task->output.beam) > (32*1024))) { /* if we have enough buffered or we got a flush bucket, open * the response now. */ - status = open_response(task, - h2_from_h1_get_response(task->output.from_h1)); - task->output.response_open = 1; + status = open_output(task); } - + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, task->c, + "h2_task(%s): slave_out leave", task->id); return status; } static apr_status_t output_finish(h2_task *task) { - apr_status_t status = APR_SUCCESS; - - if (!task->output.response_open) { - status = open_response(task, - h2_from_h1_get_response(task->output.from_h1)); - task->output.response_open = 1; + if (!task->output.opened) { + return open_output(task); } - return status; + return APR_SUCCESS; } /******************************************************************************* * task slave connection filters ******************************************************************************/ -static apr_status_t h2_filter_stream_input(ap_filter_t* filter, - apr_bucket_brigade* brigade, - ap_input_mode_t mode, - apr_read_type_e block, - apr_off_t readbytes) +static apr_status_t h2_filter_slave_input(ap_filter_t* filter, + apr_bucket_brigade* brigade, + ap_input_mode_t mode, + apr_read_type_e block, + apr_off_t readbytes) { h2_task *task = h2_ctx_cget_task(filter->c); AP_DEBUG_ASSERT(task); @@ -527,15 +387,22 @@ static apr_status_t h2_filter_continue(ap_filter_t* f, h2_task *task = h2_ctx_cget_task(f->c); apr_status_t status; - AP_DEBUG_ASSERT(task); + ap_assert(task); if (f->r->expecting_100 && ap_is_HTTP_SUCCESS(f->r->status)) { - h2_response *response; - - response = h2_response_rcreate(task->stream_id, f->r, HTTP_CONTINUE, - NULL, f->r->pool); - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, f->r, - "h2_task(%s): send 100 Continue", task->id); - status = open_response(task, response); + h2_headers *response; + apr_bucket_brigade *tmp; + apr_bucket *b; + + response = h2_headers_rcreate(f->r, HTTP_CONTINUE, NULL, f->r->pool); + tmp = apr_brigade_create(f->r->pool, f->c->bucket_alloc); + b = h2_bucket_headers_create(f->c->bucket_alloc, response); + APR_BRIGADE_INSERT_TAIL(tmp, b); + b = apr_bucket_flush_create(f->c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(tmp, b); + status = ap_pass_brigade(f->r->output_filters, tmp); + apr_brigade_destroy(tmp); + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, f->r, + "h2_task(%s): sent 100 Continue", task->id); if (status != APR_SUCCESS) { return status; } @@ -545,53 +412,26 @@ static apr_status_t h2_filter_continue(ap_filter_t* f, return ap_get_brigade(f->next, brigade, mode, block, readbytes); } -static apr_status_t h2_filter_stream_output(ap_filter_t* filter, - apr_bucket_brigade* brigade) +static apr_status_t h2_filter_slave_output(ap_filter_t* filter, + apr_bucket_brigade* brigade) { h2_task *task = h2_ctx_cget_task(filter->c); apr_status_t status; ap_assert(task); - status = output_write(task, filter, brigade); + status = slave_out(task, filter, brigade); if (status != APR_SUCCESS) { h2_task_rst(task, H2_ERR_INTERNAL_ERROR); } return status; } -static apr_status_t h2_filter_read_response(ap_filter_t* filter, - apr_bucket_brigade* bb) -{ - h2_task *task = h2_ctx_cget_task(filter->c); - AP_DEBUG_ASSERT(task); - if (!task->output.from_h1) { - return APR_ECONNABORTED; - } - return h2_from_h1_read_response(task->output.from_h1, filter, bb); -} - /******************************************************************************* * task things ******************************************************************************/ -apr_status_t h2_task_add_response(h2_task *task, h2_response *response) -{ - AP_DEBUG_ASSERT(response); - /* we used to clone the response into out own pool. But - * we have much tighter control over the EOR bucket nowadays, - * so just use the instance given */ - response->next = task->response; - task->response = response; - if (response->rst_error) { - h2_task_rst(task, response->rst_error); - } - return APR_SUCCESS; -} - - int h2_task_can_redo(h2_task *task) { - if (task->response_sent - || (task->input.beam && h2_beam_was_received(task->input.beam))) { + if (task->input.beam && h2_beam_was_received(task->input.beam)) { /* cannot repeat that. */ return 0; } @@ -602,7 +442,6 @@ int h2_task_can_redo(h2_task *task) { void h2_task_redo(h2_task *task) { - task->response = NULL; task->rst_error = 0; } @@ -612,7 +451,7 @@ void h2_task_rst(h2_task *task, int error) if (task->input.beam) { h2_beam_abort(task->input.beam); } - if (task->output.beam) { + if (!task->worker_done && task->output.beam) { h2_beam_abort(task->output.beam); } if (task->c) { @@ -644,17 +483,18 @@ void h2_task_register_hooks(void) ap_hook_process_connection(h2_task_process_conn, NULL, NULL, APR_HOOK_FIRST); - ap_register_output_filter("H2_RESPONSE", h2_response_output_filter, - NULL, AP_FTYPE_PROTOCOL); - ap_register_input_filter("H2_TO_H1", h2_filter_stream_input, + ap_register_input_filter("H2_SLAVE_IN", h2_filter_slave_input, NULL, AP_FTYPE_NETWORK); + ap_register_output_filter("H2_SLAVE_OUT", h2_filter_slave_output, + NULL, AP_FTYPE_NETWORK); + ap_register_input_filter("H2_CONTINUE", h2_filter_continue, NULL, AP_FTYPE_PROTOCOL); - ap_register_output_filter("H1_TO_H2", h2_filter_stream_output, - NULL, AP_FTYPE_NETWORK); - ap_register_output_filter("H1_TO_H2_RESP", h2_filter_read_response, + ap_register_input_filter("H2_REQUEST", h2_filter_request_in, + NULL, AP_FTYPE_PROTOCOL); + ap_register_output_filter("H2_RESPONSE", h2_headers_output_filter, NULL, AP_FTYPE_PROTOCOL); - ap_register_output_filter("H2_TRAILERS", h2_response_trailers_filter, + ap_register_output_filter("H2_TRAILERS_OUT", h2_filter_trailers_out, NULL, AP_FTYPE_PROTOCOL); } @@ -680,17 +520,15 @@ static int h2_task_pre_conn(conn_rec* c, void *arg) if (h2_ctx_is_task(ctx)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, "h2_h2, pre_connection, found stream task"); - - /* Add our own, network level in- and output filters. - */ - ap_add_input_filter("H2_TO_H1", NULL, NULL, c); - ap_add_output_filter("H1_TO_H2", NULL, NULL, c); + ap_add_input_filter("H2_SLAVE_IN", NULL, NULL, c); + ap_add_output_filter("H2_SLAVE_OUT", NULL, NULL, c); } return OK; } -h2_task *h2_task_create(conn_rec *c, const h2_request *req, - h2_bucket_beam *input, h2_mplx *mplx) +h2_task *h2_task_create(conn_rec *c, apr_uint32_t stream_id, + const h2_request *req, h2_bucket_beam *input, + h2_mplx *mplx) { apr_pool_t *pool; h2_task *task; @@ -704,18 +542,16 @@ h2_task *h2_task_create(conn_rec *c, const h2_request *req, if (task == NULL) { ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, c, APLOGNO(02941) "h2_task(%ld-%d): create stream task", - c->id, req->id); + c->id, stream_id); return NULL; } - task->id = apr_psprintf(pool, "%ld-%d", c->id, req->id); - task->stream_id = req->id; + task->id = apr_psprintf(pool, "%ld-%d", c->master->id, stream_id); + task->stream_id = stream_id; task->c = c; task->mplx = mplx; task->c->keepalives = mplx->c->keepalives; task->pool = pool; task->request = req; - task->ser_headers = req->serialize; - task->blocking = 1; task->input.beam = input; apr_thread_cond_create(&task->cond, pool); @@ -738,43 +574,23 @@ void h2_task_destroy(h2_task *task) } } -void h2_task_set_io_blocking(h2_task *task, int blocking) -{ - task->blocking = blocking; -} - apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread) { AP_DEBUG_ASSERT(task); - task->input.block = APR_BLOCK_READ; task->input.chunked = task->request->chunked; - task->input.eos = !task->request->body; - if (task->input.eos && !task->input.chunked && !task->ser_headers) { - /* We do not serialize/chunk and have eos already, no need to - * create a bucket brigade. */ - task->input.bb = NULL; - task->input.eos_written = 1; - } - else { - task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc); - if (task->ser_headers) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, - "h2_task(%s): serialize request %s %s", - task->id, task->request->method, task->request->path); - apr_brigade_printf(task->input.bb, NULL, - NULL, "%s %s HTTP/1.1\r\n", - task->request->method, task->request->path); - apr_table_do(input_ser_header, task, task->request->headers, NULL); - apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n"); - } - if (task->input.eos) { - input_append_eos(task, NULL); - } + task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc); + if (task->request->serialize) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + "h2_task(%s): serialize request %s %s", + task->id, task->request->method, task->request->path); + apr_brigade_printf(task->input.bb, NULL, + NULL, "%s %s HTTP/1.1\r\n", + task->request->method, task->request->path); + apr_table_do(input_ser_header, task, task->request->headers, NULL); + apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n"); } - task->output.from_h1 = h2_from_h1_create(task->stream_id, task->pool); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, "h2_task(%s): process connection", task->id); task->c->current_thread = thread; @@ -811,7 +627,6 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c) } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_task(%s): start process_request", task->id); - task->r = r; ap_process_request(r); if (task->frozen) { @@ -819,10 +634,9 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c) "h2_task(%s): process_request frozen", task->id); } else { - task->r = NULL; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + "h2_task(%s): process_request done", task->id); } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_task(%s): process_request done", task->id); /* After the call to ap_process_request, the * request pool will have been deleted. We set @@ -858,7 +672,7 @@ static int h2_task_process_conn(conn_rec* c) ctx = h2_ctx_get(c, 0); if (h2_ctx_is_task(ctx)) { - if (!ctx->task->ser_headers) { + if (!ctx->task->request->serialize) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_h2, processing request directly"); h2_task_process_request(ctx->task, c); diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index 85b530fa3f..9c81a8693a 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -44,7 +44,6 @@ struct h2_mplx; struct h2_task; struct h2_req_engine; struct h2_request; -struct h2_response; struct h2_worker; typedef struct h2_task h2_task; @@ -56,36 +55,29 @@ struct h2_task { apr_pool_t *pool; const struct h2_request *request; - struct h2_response *response; + int rst_error; /* h2 related stream abort error */ struct { struct h2_bucket_beam *beam; - apr_bucket_brigade *bb; - apr_read_type_e block; unsigned int chunked : 1; unsigned int eos : 1; - unsigned int eos_written : 1; + apr_bucket_brigade *bb; + apr_bucket_brigade *bbchunk; } input; struct { struct h2_bucket_beam *beam; - struct h2_from_h1 *from_h1; unsigned int opened : 1; - unsigned int response_open : 1; + unsigned int sent_response : 1; unsigned int copy_files : 1; - apr_off_t written; apr_bucket_brigade *bb; } output; struct h2_mplx *mplx; struct apr_thread_cond_t *cond; - int rst_error; /* h2 related stream abort error */ unsigned int filters_set : 1; - unsigned int ser_headers : 1; unsigned int frozen : 1; - unsigned int blocking : 1; unsigned int detached : 1; - unsigned int response_sent : 1; /* a response has been sent to client */ unsigned int worker_started : 1; /* h2_worker started processing for this io */ unsigned int worker_done : 1; /* h2_worker finished for this io */ @@ -95,18 +87,16 @@ struct h2_task { struct h2_req_engine *engine; /* engine hosted by this task */ struct h2_req_engine *assigned; /* engine that task has been assigned to */ - request_rec *r; /* request being processed in this task */ }; -h2_task *h2_task_create(conn_rec *c, const struct h2_request *req, +h2_task *h2_task_create(conn_rec *c, apr_uint32_t stream_id, + const struct h2_request *req, struct h2_bucket_beam *input, struct h2_mplx *mplx); void h2_task_destroy(h2_task *task); apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread); -apr_status_t h2_task_add_response(h2_task *task, struct h2_response *response); - void h2_task_redo(h2_task *task); int h2_task_can_redo(h2_task *task); @@ -128,6 +118,4 @@ apr_status_t h2_task_freeze(h2_task *task); apr_status_t h2_task_thaw(h2_task *task); int h2_task_is_detached(h2_task *task); -void h2_task_set_io_blocking(h2_task *task, int blocking); - #endif /* defined(__mod_h2__h2_task__) */ diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 16c90a639a..f62bc6c8b7 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -903,11 +903,11 @@ apr_size_t h2_util_bucket_print(char *buffer, apr_size_t bmax, off += apr_snprintf(buffer+off, bmax-off, "eor"); } else { - off += apr_snprintf(buffer+off, bmax-off, "meta(unknown)"); + off += apr_snprintf(buffer+off, bmax-off, "%s", b->type->name); } } else { - const char *btype = "data"; + const char *btype = b->type->name; if (APR_BUCKET_IS_FILE(b)) { btype = "file"; } @@ -972,7 +972,8 @@ apr_size_t h2_util_bb_print(char *buffer, apr_size_t bmax, apr_status_t h2_append_brigade(apr_bucket_brigade *to, apr_bucket_brigade *from, apr_off_t *plen, - int *peos) + int *peos, + h2_bucket_gate *should_append) { apr_bucket *e; apr_off_t len = 0, remain = *plen; @@ -983,7 +984,10 @@ apr_status_t h2_append_brigade(apr_bucket_brigade *to, while (!APR_BRIGADE_EMPTY(from)) { e = APR_BRIGADE_FIRST(from); - if (APR_BUCKET_IS_METADATA(e)) { + if (!should_append(e)) { + goto leave; + } + else if (APR_BUCKET_IS_METADATA(e)) { if (APR_BUCKET_IS_EOS(e)) { *peos = 1; apr_bucket_delete(e); @@ -1002,7 +1006,7 @@ apr_status_t h2_append_brigade(apr_bucket_brigade *to, if (remain < e->length) { if (remain <= 0) { - return APR_SUCCESS; + goto leave; } apr_bucket_split(e, (apr_size_t)remain); } @@ -1013,7 +1017,7 @@ apr_status_t h2_append_brigade(apr_bucket_brigade *to, len += e->length; remain -= e->length; } - +leave: *plen = len; return APR_SUCCESS; } @@ -1282,7 +1286,6 @@ h2_request *h2_req_create(int id, apr_pool_t *pool, const char *method, { h2_request *req = apr_pcalloc(pool, sizeof(h2_request)); - req->id = id; req->method = method; req->scheme = scheme; req->authority = authority; @@ -1380,11 +1383,11 @@ int h2_util_frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen) /******************************************************************************* * push policy ******************************************************************************/ -void h2_push_policy_determine(struct h2_request *req, apr_pool_t *p, int push_enabled) +int h2_push_policy_determine(apr_table_t *headers, apr_pool_t *p, int push_enabled) { h2_push_policy policy = H2_PUSH_NONE; if (push_enabled) { - const char *val = apr_table_get(req->headers, "accept-push-policy"); + const char *val = apr_table_get(headers, "accept-push-policy"); if (val) { if (ap_find_token(p, val, "fast-load")) { policy = H2_PUSH_FAST_LOAD; @@ -1407,6 +1410,6 @@ void h2_push_policy_determine(struct h2_request *req, apr_pool_t *p, int push_en policy = H2_PUSH_DEFAULT; } } - req->push_policy = policy; + return policy; } diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index 8d2c195840..024c31381e 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -198,11 +198,12 @@ int h2_proxy_res_ignore_header(const char *name, size_t len); * account, see draft https://tools.ietf.org/html/draft-ruellan-http-accept-push-policy-00 * for details. * - * @param req the request to determine the policy for + * @param headers the http headers to inspect * @param p the pool to use * @param push_enabled if HTTP/2 server push is generally enabled for this request + * @return the push policy desired */ -void h2_push_policy_determine(struct h2_request *req, apr_pool_t *p, int push_enabled); +int h2_push_policy_determine(apr_table_t *headers, apr_pool_t *p, int push_enabled); /******************************************************************************* * base64 url encoding, different table from normal base64 @@ -352,11 +353,12 @@ do { \ const char *line = "(null)"; \ apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \ len = h2_util_bb_print(buffer, bmax, (tag), "", (bb)); \ - ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%ld-%d): %s", \ - (c)->id, (int)(sid), (len? buffer : line)); \ + ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%s): %s", \ + (c)->log_id, (len? buffer : line)); \ } while(0) +typedef int h2_bucket_gate(apr_bucket *b); /** * Transfer buckets from one brigade to another with a limit on the * maximum amount of bytes transferred. Does no setaside magic, lifetime @@ -369,7 +371,8 @@ do { \ apr_status_t h2_append_brigade(apr_bucket_brigade *to, apr_bucket_brigade *from, apr_off_t *plen, - int *peos); + int *peos, + h2_bucket_gate *should_append); /** * Get an approximnation of the memory footprint of the given diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index d34ee06e9f..37d8e27a79 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.6.3-DEV" +#define MOD_HTTP2_VERSION "1.7.0-DEV" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010603 +#define MOD_HTTP2_VERSION_NUM 0x010700 #endif /* mod_h2_h2_version_h */ diff --git a/modules/http2/mod_http2.c b/modules/http2/mod_http2.c index 65ea3d6aa4..8cf6e2de69 100644 --- a/modules/http2/mod_http2.c +++ b/modules/http2/mod_http2.c @@ -233,8 +233,11 @@ static const char *val_H2_PUSH(apr_pool_t *p, server_rec *s, if (ctx) { if (r) { h2_task *task = h2_ctx_get_task(ctx); - if (task && task->request->push_policy != H2_PUSH_NONE) { - return "on"; + if (task) { + h2_stream *stream = h2_mplx_stream_get(task->mplx, task->stream_id); + if (stream && stream->push_policy != H2_PUSH_NONE) { + return "on"; + } } } else if (c && h2_session_push_enabled(ctx->session)) { @@ -268,7 +271,10 @@ static const char *val_H2_PUSHED_ON(apr_pool_t *p, server_rec *s, if (ctx) { h2_task *task = h2_ctx_get_task(ctx); if (task && !H2_STREAM_CLIENT_INITIATED(task->stream_id)) { - return apr_itoa(p, task->request->initiated_on); + h2_stream *stream = h2_mplx_stream_get(task->mplx, task->stream_id); + if (stream) { + return apr_itoa(p, stream->initiated_on); + } } } return ""; diff --git a/modules/http2/mod_http2.dsp b/modules/http2/mod_http2.dsp index 949414872c..f26607470d 100644 --- a/modules/http2/mod_http2.dsp +++ b/modules/http2/mod_http2.dsp @@ -161,7 +161,7 @@ SOURCE=./h2_request.c # End Source File # Begin Source File -SOURCE=./h2_response.c +SOURCE=./h2_headers.c # End Source File # Begin Source File |