diff options
-rw-r--r-- | CHANGES | 5 | ||||
-rw-r--r-- | CMakeLists.txt | 2 | ||||
-rw-r--r-- | modules/http2/NWGNUmod_http2 | 2 | ||||
-rw-r--r-- | modules/http2/config2.m4 | 9 | ||||
-rw-r--r-- | modules/http2/h2_ctx.c | 12 | ||||
-rw-r--r-- | modules/http2/h2_ctx.h | 2 | ||||
-rw-r--r-- | modules/http2/h2_int_queue.c | 182 | ||||
-rw-r--r-- | modules/http2/h2_int_queue.h | 103 | ||||
-rw-r--r-- | modules/http2/h2_io.c | 17 | ||||
-rw-r--r-- | modules/http2/h2_io.h | 7 | ||||
-rw-r--r-- | modules/http2/h2_mplx.c | 369 | ||||
-rw-r--r-- | modules/http2/h2_mplx.h | 28 | ||||
-rw-r--r-- | modules/http2/h2_proxy_session.c | 839 | ||||
-rw-r--r-- | modules/http2/h2_proxy_session.h | 78 | ||||
-rw-r--r-- | modules/http2/h2_session.c | 16 | ||||
-rw-r--r-- | modules/http2/h2_task.c | 4 | ||||
-rw-r--r-- | modules/http2/h2_task_input.c | 30 | ||||
-rw-r--r-- | modules/http2/h2_task_input.h | 3 | ||||
-rw-r--r-- | modules/http2/h2_util.c | 2 | ||||
-rw-r--r-- | modules/http2/h2_worker.c | 51 | ||||
-rw-r--r-- | modules/http2/h2_workers.c | 1 | ||||
-rw-r--r-- | modules/http2/h2_workers.h | 1 | ||||
-rw-r--r-- | modules/http2/mod_http2.c | 38 | ||||
-rw-r--r-- | modules/http2/mod_http2.dsp | 8 | ||||
-rw-r--r-- | modules/http2/mod_http2.h | 24 | ||||
-rw-r--r-- | modules/http2/mod_proxy_http2.c | 140 |
26 files changed, 1506 insertions, 467 deletions
@@ -1,6 +1,10 @@ -*- coding: utf-8 -*- Changes with Apache 2.5.0 + *) mod_proxy_http2: using single connection for several requests *if* + master connection uses HTTP/2 itself. Not yet hardened under load. + [Stefan Eissing] + *) core: Added support for HTTP code 451. PR58985. [Yehuda Katz <yehuda ymkatz.net>, Jim Jagielski] @@ -22,6 +26,7 @@ Changes with Apache 2.5.0 *) mod_proxy_http2: new experimental http2 proxy module for h2: and h2c: proxy urls. Uses, so far, one connection per request, reuses connections. + [Stefan Eissing] *) event: use pre_connection hook to properly initialize connection state for slave connections. use protocol_switch hook to initialize server config diff --git a/CMakeLists.txt b/CMakeLists.txt index 3ea412f02d..d9fc914a38 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -409,7 +409,7 @@ SET(mod_http2_extra_sources modules/http2/h2_session.c modules/http2/h2_stream.c modules/http2/h2_stream_set.c modules/http2/h2_switch.c modules/http2/h2_task.c modules/http2/h2_task_input.c - modules/http2/h2_task_output.c modules/http2/h2_task_queue.c + modules/http2/h2_task_output.c modules/http2/h2_int_queue.c modules/http2/h2_util.c modules/http2/h2_worker.c modules/http2/h2_workers.c ) diff --git a/modules/http2/NWGNUmod_http2 b/modules/http2/NWGNUmod_http2 index dd4ac10d20..0e53b4ae1f 100644 --- a/modules/http2/NWGNUmod_http2 +++ b/modules/http2/NWGNUmod_http2 @@ -194,6 +194,7 @@ FILES_nlm_objs = \ $(OBJDIR)/h2_filter.o \ $(OBJDIR)/h2_from_h1.o \ $(OBJDIR)/h2_h2.o \ + $(OBJDIR)/h2_int_queue.o \ $(OBJDIR)/h2_io.o \ $(OBJDIR)/h2_io_set.o \ $(OBJDIR)/h2_mplx.o \ @@ -207,7 +208,6 @@ FILES_nlm_objs = \ $(OBJDIR)/h2_task.o \ $(OBJDIR)/h2_task_input.o \ $(OBJDIR)/h2_task_output.o \ - $(OBJDIR)/h2_task_queue.o \ $(OBJDIR)/h2_util.o \ $(OBJDIR)/h2_worker.o \ $(OBJDIR)/h2_workers.o \ diff --git a/modules/http2/config2.m4 b/modules/http2/config2.m4 index 0c9871552f..85a635e6b3 100644 --- a/modules/http2/config2.m4 +++ b/modules/http2/config2.m4 @@ -29,6 +29,7 @@ h2_ctx.lo dnl h2_filter.lo dnl h2_from_h1.lo dnl h2_h2.lo dnl +h2_int_queue.lo dnl h2_io.lo dnl h2_io_set.lo dnl h2_mplx.lo dnl @@ -42,7 +43,6 @@ h2_switch.lo dnl h2_task.lo dnl h2_task_input.lo dnl h2_task_output.lo dnl -h2_task_queue.lo dnl h2_util.lo dnl h2_worker.lo dnl h2_workers.lo dnl @@ -156,8 +156,10 @@ AC_DEFUN([APACHE_CHECK_NGHTTP2],[ AC_MSG_WARN([nghttp2 library is unusable]) fi dnl # nghttp2 >= 1.3.0: access to stream weights - AC_CHECK_FUNCS([nghttp2_stream_get_weight], - [APR_ADDTO(MOD_CPPFLAGS, ["-DH2_NG2_STREAM_API"])], []) + AC_CHECK_FUNCS([nghttp2_stream_get_weight], [], [liberrors="yes"]) + if test "x$liberrors" != "x"; then + AC_MSG_WARN([nghttp2 version >= 1.3.0 is required]) + fi dnl # nghttp2 >= 1.5.0: changing stream priorities AC_CHECK_FUNCS([nghttp2_session_change_stream_priority], [APR_ADDTO(MOD_CPPFLAGS, ["-DH2_NG2_CHANGE_PRIO"])], []) @@ -206,6 +208,7 @@ APR_ADDTO(INCLUDES, [-I\$(top_srcdir)/$modpath_current]) dnl # list of module object files proxy_http2_objs="dnl mod_proxy_http2.lo dnl +h2_int_queue.lo dnl h2_proxy_session.lo dnl h2_request.lo dnl h2_util.lo dnl diff --git a/modules/http2/h2_ctx.c b/modules/http2/h2_ctx.c index b40037cb35..e8294fcb2b 100644 --- a/modules/http2/h2_ctx.c +++ b/modules/http2/h2_ctx.c @@ -101,7 +101,17 @@ int h2_ctx_is_task(h2_ctx *ctx) return ctx && ctx->task; } -struct h2_task *h2_ctx_get_task(h2_ctx *ctx) +h2_task *h2_ctx_get_task(h2_ctx *ctx) { return ctx? ctx->task : NULL; } + +h2_task *h2_ctx_cget_task(conn_rec *c) +{ + return h2_ctx_get_task(h2_ctx_get(c, 0)); +} + +h2_task *h2_ctx_rget_task(request_rec *r) +{ + return h2_ctx_get_task(h2_ctx_rget(r)); +} diff --git a/modules/http2/h2_ctx.h b/modules/http2/h2_ctx.h index 68dc7c84c3..3b2c842cae 100644 --- a/modules/http2/h2_ctx.h +++ b/modules/http2/h2_ctx.h @@ -71,5 +71,7 @@ const char *h2_ctx_protocol_get(const conn_rec *c); int h2_ctx_is_task(h2_ctx *ctx); struct h2_task *h2_ctx_get_task(h2_ctx *ctx); +struct h2_task *h2_ctx_cget_task(conn_rec *c); +struct h2_task *h2_ctx_rget_task(request_rec *r); #endif /* defined(__mod_h2__h2_ctx__) */ diff --git a/modules/http2/h2_int_queue.c b/modules/http2/h2_int_queue.c new file mode 100644 index 0000000000..ba44afb71c --- /dev/null +++ b/modules/http2/h2_int_queue.c @@ -0,0 +1,182 @@ +/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <assert.h> +#include <stddef.h> +#include <apr_pools.h> + +#include "h2_int_queue.h" + + +static void tq_grow(h2_int_queue *q, int nlen); +static void tq_swap(h2_int_queue *q, int i, int j); +static int tq_bubble_up(h2_int_queue *q, int i, int top, + h2_iq_cmp *cmp, void *ctx); +static int tq_bubble_down(h2_int_queue *q, int i, int bottom, + h2_iq_cmp *cmp, void *ctx); + +h2_int_queue *h2_iq_create(apr_pool_t *pool, int capacity) +{ + h2_int_queue *q = apr_pcalloc(pool, sizeof(h2_int_queue)); + if (q) { + q->pool = pool; + tq_grow(q, capacity); + q->nelts = 0; + } + return q; +} + +int h2_iq_empty(h2_int_queue *q) +{ + return q->nelts == 0; +} + +int h2_iq_size(h2_int_queue *q) +{ + return q->nelts; +} + + +void h2_iq_add(h2_int_queue *q, int sid, h2_iq_cmp *cmp, void *ctx) +{ + int i; + + if (q->nelts >= q->nalloc) { + tq_grow(q, q->nalloc * 2); + } + + i = (q->head + q->nelts) % q->nalloc; + q->elts[i] = sid; + ++q->nelts; + + if (cmp) { + /* bubble it to the front of the queue */ + tq_bubble_up(q, i, q->head, cmp, ctx); + } +} + +int h2_iq_remove(h2_int_queue *q, int sid) +{ + int i; + for (i = 0; i < q->nelts; ++i) { + if (sid == q->elts[(q->head + i) % q->nalloc]) { + break; + } + } + + if (i < q->nelts) { + ++i; + for (; i < q->nelts; ++i) { + q->elts[(q->head+i-1)%q->nalloc] = q->elts[(q->head+i)%q->nalloc]; + } + --q->nelts; + return 1; + } + return 0; +} + +void h2_iq_sort(h2_int_queue *q, h2_iq_cmp *cmp, void *ctx) +{ + /* Assume that changes in ordering are minimal. This needs, + * best case, q->nelts - 1 comparisions to check that nothing + * changed. + */ + if (q->nelts > 0) { + int i, ni, prev, last; + + /* Start at the end of the queue and create a tail of sorted + * entries. Make that tail one element longer in each iteration. + */ + last = i = (q->head + q->nelts - 1) % q->nalloc; + while (i != q->head) { + prev = (q->nalloc + i - 1) % q->nalloc; + + ni = tq_bubble_up(q, i, prev, cmp, ctx); + if (ni == prev) { + /* i bubbled one up, bubble the new i down, which + * keeps all tasks below i sorted. */ + tq_bubble_down(q, i, last, cmp, ctx); + } + i = prev; + }; + } +} + + +int h2_iq_shift(h2_int_queue *q) +{ + int sid; + + if (q->nelts <= 0) { + return 0; + } + + sid = q->elts[q->head]; + q->head = (q->head + 1) % q->nalloc; + q->nelts--; + + return sid; +} + +static void tq_grow(h2_int_queue *q, int nlen) +{ + if (nlen > q->nalloc) { + int *nq = apr_pcalloc(q->pool, sizeof(int) * nlen); + if (q->nelts > 0) { + int l = ((q->head + q->nelts) % q->nalloc) - q->head; + + memmove(nq, q->elts + q->head, sizeof(int) * l); + if (l < q->nelts) { + /* elts wrapped, append elts in [0, remain] to nq */ + int remain = q->nelts - l; + memmove(nq + l, q->elts, sizeof(int) * remain); + } + } + q->elts = nq; + q->nalloc = nlen; + q->head = 0; + } +} + +static void tq_swap(h2_int_queue *q, int i, int j) +{ + int x = q->elts[i]; + q->elts[i] = q->elts[j]; + q->elts[j] = x; +} + +static int tq_bubble_up(h2_int_queue *q, int i, int top, + h2_iq_cmp *cmp, void *ctx) +{ + int prev; + while (((prev = (q->nalloc + i - 1) % q->nalloc), i != top) + && (*cmp)(q->elts[i], q->elts[prev], ctx) < 0) { + tq_swap(q, prev, i); + i = prev; + } + return i; +} + +static int tq_bubble_down(h2_int_queue *q, int i, int bottom, + h2_iq_cmp *cmp, void *ctx) +{ + int next; + while (((next = (q->nalloc + i + 1) % q->nalloc), i != bottom) + && (*cmp)(q->elts[i], q->elts[next], ctx) > 0) { + tq_swap(q, next, i); + i = next; + } + return i; +} diff --git a/modules/http2/h2_int_queue.h b/modules/http2/h2_int_queue.h new file mode 100644 index 0000000000..6cdd84c42b --- /dev/null +++ b/modules/http2/h2_int_queue.h @@ -0,0 +1,103 @@ +/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __mod_h2__h2_int_queue__ +#define __mod_h2__h2_int_queue__ + +/** + * h2_int_queue keeps a list of sorted h2_task* in ascending order. + */ +typedef struct h2_int_queue h2_int_queue; + +struct h2_int_queue { + int *elts; + int head; + int nelts; + int nalloc; + apr_pool_t *pool; +}; + +/** + * Comparator for two task to determine their order. + * + * @param s1 stream id to compare + * @param s2 stream id to compare + * @param ctx provided user data + * @return value is the same as for strcmp() and has the effect: + * == 0: s1 and s2 are treated equal in ordering + * < 0: s1 should be sorted before s2 + * > 0: s2 should be sorted before s1 + */ +typedef int h2_iq_cmp(int s1, int s2, void *ctx); + + +/** + * Allocate a new queue from the pool and initialize. + * @param id the identifier of the queue + * @param pool the memory pool + */ +h2_int_queue *h2_iq_create(apr_pool_t *pool, int capacity); + +/** + * Return != 0 iff there are no tasks in the queue. + * @param q the queue to check + */ +int h2_iq_empty(h2_int_queue *q); + +/** + * Return the number of int in the queue. + * @param q the queue to get size on + */ +int h2_iq_size(h2_int_queue *q); + +/** + * Add a stream idto the queue. + * + * @param q the queue to append the task to + * @param sid the stream id to add + * @param cmp the comparator for sorting + * @param ctx user data for comparator + */ +void h2_iq_add(h2_int_queue *q, int sid, h2_iq_cmp *cmp, void *ctx); + +/** + * Remove the stream id from the queue. Return != 0 iff task + * was found in queue. + * @param q the task queue + * @param sid the stream id to remove + * @return != 0 iff task was found in queue + */ +int h2_iq_remove(h2_int_queue *q, int sid); + +/** + * Sort the stream idqueue again. Call if the task ordering + * has changed. + * + * @param q the queue to sort + * @param cmp the comparator for sorting + * @param ctx user data for the comparator + */ +void h2_iq_sort(h2_int_queue *q, h2_iq_cmp *cmp, void *ctx); + +/** + * Get the first stream id from the queue or NULL if the queue is empty. + * The task will be removed. + * + * @param q the queue to get the first task from + * @return the first stream id of the queue, 0 if empty + */ +int h2_iq_shift(h2_int_queue *q); + +#endif /* defined(__mod_h2__h2_int_queue__) */ diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c index 07953d1899..6107c01ea5 100644 --- a/modules/http2/h2_io.c +++ b/modules/http2/h2_io.c @@ -75,6 +75,11 @@ int h2_io_in_has_eos_for(h2_io *io) return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, -1)); } +int h2_io_in_has_data(h2_io *io) +{ + return io->bbin && h2_util_bb_has_data_or_eos(io->bbin); +} + int h2_io_out_has_data(h2_io *io) { return io->bbout && h2_util_bb_has_data_or_eos(io->bbout); @@ -256,6 +261,18 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, } } + if (status == APR_SUCCESS && (!io->bbin || APR_BRIGADE_EMPTY(io->bbin))) { + if (io->eos_in) { + if (!io->eos_in_written) { + status = append_eos(io, bb, trailers); + io->eos_in_written = 1; + } + } + } + + if (status == APR_SUCCESS && APR_BRIGADE_EMPTY(bb)) { + return APR_EAGAIN; + } return status; } diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h index f0b085b0d2..9c1584a376 100644 --- a/modules/http2/h2_io.h +++ b/modules/http2/h2_io.h @@ -63,9 +63,6 @@ struct h2_io { apr_size_t input_consumed; /* how many bytes have been read */ int files_handles_owned; - - struct h2_task *task; /* parked task */ - request_rec *r; /* parked request */ }; /******************************************************************************* @@ -101,6 +98,10 @@ int h2_io_in_has_eos_for(h2_io *io); * Output data is available. */ int h2_io_out_has_data(h2_io *io); +/** + * Input data is available. + */ +int h2_io_in_has_data(h2_io *io); void h2_io_signal(h2_io *io, h2_io_op op); void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout, diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 80b36c05ca..f5c4bb7acd 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -34,6 +34,7 @@ #include "h2_conn.h" #include "h2_ctx.h" #include "h2_h2.h" +#include "h2_int_queue.h" #include "h2_io.h" #include "h2_io_set.h" #include "h2_response.h" @@ -44,7 +45,6 @@ #include "h2_task.h" #include "h2_task_input.h" #include "h2_task_output.h" -#include "h2_task_queue.h" #include "h2_worker.h" #include "h2_workers.h" #include "h2_util.h" @@ -206,7 +206,13 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, return NULL; } - m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS)); + status = apr_thread_cond_create(&m->request_done, m->pool); + if (status != APR_SUCCESS) { + h2_mplx_destroy(m); + return NULL; + } + + m->q = h2_iq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS)); m->stream_ios = h2_io_set_create(m->pool); m->ready_ios = h2_io_set_create(m->pool); m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); @@ -296,7 +302,7 @@ static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error) h2_io_set_remove(m->ready_ios, io); if (!io->processing_started || io->processing_done) { /* already finished or not even started yet */ - h2_tq_remove(m->q, io->id); + h2_iq_remove(m->q, io->id); io_destroy(m, io, 1); return 0; } @@ -324,6 +330,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) /* disable WINDOW_UPDATE callbacks */ h2_mplx_set_consumed_cb(m, NULL, NULL); + apr_thread_cond_broadcast(m->request_done); while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) { /* iterate until all ios have been orphaned or destroyed */ } @@ -351,6 +358,8 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) "all h2_workers to return, have still %d requests outstanding", m->id, i*wait_secs, (int)h2_io_set_size(m->stream_ios)); } + m->aborted = 1; + apr_thread_cond_broadcast(m->request_done); } } ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) @@ -408,7 +417,7 @@ static const h2_request *pop_request(h2_mplx *m) { const h2_request *req = NULL; int sid; - while (!m->aborted && !req && (sid = h2_tq_shift(m->q)) > 0) { + while (!m->aborted && !req && (sid = h2_iq_shift(m->q)) > 0) { h2_io *io = h2_io_set_get(m->stream_ios, sid); if (io) { req = io->request; @@ -421,17 +430,8 @@ static const h2_request *pop_request(h2_mplx *m) return req; } -static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, request_rec *r) -{ - if (!m->engine_queue) { - apr_queue_create(&m->engine_queue, 200, m->pool); - } - return apr_queue_trypush(m->engine_queue, r); -} - -void h2_mplx_request_done(h2_mplx **pm, int stream_id, const h2_request **preq) +void h2_mplx_request_done(h2_mplx *m, int stream_id, const h2_request **preq) { - h2_mplx *m = *pm; int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { @@ -440,49 +440,30 @@ void h2_mplx_request_done(h2_mplx **pm, int stream_id, const h2_request **preq) ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld): request(%d) done", m->id, stream_id); if (io) { - request_rec *r = io->r; - + io->processing_done = 1; + h2_mplx_out_close(m, stream_id, NULL); if (io->orphaned) { - io->processing_done = 1; - } - else if (r) { - /* A parked request which is being transferred from - * one worker thread to another. This request_done call - * was from the initial thread and now it is safe to - * schedule it for further processing. */ - h2_task_thaw(io->task); - io->task = NULL; - io->r = NULL; - h2_mplx_engine_schedule(*pm, r); + io_destroy(m, io, 0); + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); + } } else { - io->processing_done = 1; - } - - if (io->processing_done) { - h2_io_out_close(io, NULL); - if (io->orphaned) { - io_destroy(m, io, 0); - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } - } - else { - /* hang around until the stream deregisteres */ - } + /* hang around until the stream deregisteres */ } } + else { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, + "h2_mplx(%ld): request(%d) done, no io found", + m->id, stream_id); + } + apr_thread_cond_broadcast(m->request_done); } if (preq) { /* someone wants another request, if we have */ *preq = pop_request(m); } - if (!preq || !*preq) { - /* No request to hand back to the worker, NULLify reference - * and decrement count */ - *pm = NULL; - } leave_mutex(m, acquired); } } @@ -935,6 +916,26 @@ int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id) return has_eos; } +int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id) +{ + apr_status_t status; + int has_data = 0; + int acquired; + + AP_DEBUG_ASSERT(m); + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { + h2_io *io = h2_io_set_get(m->stream_ios, stream_id); + if (io && !io->orphaned) { + has_data = h2_io_in_has_data(io); + } + else { + has_data = 0; + } + leave_mutex(m, acquired); + } + return has_data; +} + int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id) { apr_status_t status; @@ -1001,7 +1002,7 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) status = APR_ECONNABORTED; } else { - h2_tq_sort(m->q, cmp, ctx); + h2_iq_sort(m->q, cmp, ctx); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): reprioritize tasks", m->id); @@ -1050,8 +1051,8 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const h2_request *req, status = h2_io_in_close(io); } - was_empty = h2_tq_empty(m->q); - h2_tq_add(m->q, io->id, cmp, ctx); + was_empty = h2_iq_empty(m->q); + h2_iq_add(m->q, io->id, cmp, ctx); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, "h2_mplx(%ld-%d): process", m->c->id, stream_id); @@ -1079,61 +1080,116 @@ const h2_request *h2_mplx_pop_request(h2_mplx *m, int *has_more) } else { req = pop_request(m); - *has_more = !h2_tq_empty(m->q); + *has_more = !h2_iq_empty(m->q); } leave_mutex(m, acquired); } return req; } -apr_status_t h2_mplx_engine_push(h2_mplx *m, h2_task *task, - const char *engine_type, + +/******************************************************************************* + * HTTP/2 request engines + ******************************************************************************/ + +typedef struct h2_req_engine_i h2_req_engine_i; +struct h2_req_engine_i { + h2_req_engine pub; + conn_rec *c; /* connection this engine is assigned to */ + h2_mplx *m; + unsigned int shutdown : 1; /* engine is being shut down */ + apr_thread_cond_t *io; /* condition var for waiting on data */ + apr_queue_t *queue; /* queue of scheduled request_rec* */ + apr_size_t no_assigned; /* # of assigned requests */ + apr_size_t no_live; /* # of live */ + apr_size_t no_finished; /* # of finished */ +}; + +static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, + h2_req_engine_i *engine, + request_rec *r) +{ + if (!engine->queue) { + apr_queue_create(&engine->queue, 100, engine->pub.pool); + } + return apr_queue_trypush(engine->queue, r); +} + + +apr_status_t h2_mplx_engine_push(const char *engine_type, request_rec *r, h2_mplx_engine_init *einit) { apr_status_t status; + h2_mplx *m; + h2_task *task; int acquired; + task = h2_ctx_rget_task(r); + if (!task) { + return APR_ECONNABORTED; + } + m = task->mplx; AP_DEBUG_ASSERT(m); + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); if (!io || io->orphaned) { status = APR_ECONNABORTED; } else { - h2_req_engine *engine; + h2_req_engine_i *engine = (h2_req_engine_i*)m->engine; apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id); status = APR_EOF; - engine = m->engine; /* just a single one for now */ + if (task->ser_headers) { /* Max compatibility, deny processing of this */ } - else if (!engine && einit) { - engine = apr_pcalloc(r->connection->pool, sizeof(*engine)); - engine->id = 1; - engine->c = r->connection; - engine->pool = r->connection->pool; - engine->type = apr_pstrdup(engine->pool, engine_type); - - status = einit(engine, r); - if (status == APR_SUCCESS) { - m->engine = engine; + else if (engine && !strcmp(engine->pub.type, engine_type)) { + if (engine->shutdown + || engine->no_assigned >= H2MIN(engine->pub.capacity, 100)) { ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, - "h2_mplx(%ld): init engine %d (%s)", - m->c->id, engine->id, engine->type); + "h2_mplx(%ld): engine shutdown or over %s", + m->c->id, engine->pub.id); + engine = NULL; } - } - else if (engine && !strcmp(engine->type, engine_type)) { - if (status == APR_SUCCESS) { + else if (h2_mplx_engine_schedule(m, engine, r) == APR_SUCCESS) { /* this task will be processed in another thread, * freeze any I/O for the time being. */ h2_task_freeze(task, r); - io->task = task; - io->r = r; + engine->no_assigned++; + status = APR_SUCCESS; + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, + "h2_mplx(%ld): push request %s", + m->c->id, r->the_request); + } + else { + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, + "h2_mplx(%ld): engine error adding req %s", + m->c->id, engine->pub.id); + engine = NULL; + } + } + + if (!engine && einit) { + engine = apr_pcalloc(task->pool, sizeof(*engine)); + engine->pub.id = apr_psprintf(task->pool, "eng-%ld-%d", + m->id, m->next_eng_id++); + engine->pub.pool = task->pool; + engine->pub.type = apr_pstrdup(task->pool, engine_type); + engine->c = r->connection; + engine->m = m; + engine->io = task->io; + engine->no_assigned = 1; + engine->no_live = 1; + + status = einit(&engine->pub, r); + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, + "h2_mplx(%ld): init engine %s (%s)", + m->c->id, engine->pub.id, engine->pub.type); + if (status == APR_SUCCESS) { + m->engine = &engine->pub; } - ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, r, - "h2_mplx(%ld): push request %s", - m->c->id, r->the_request); } } @@ -1141,52 +1197,163 @@ apr_status_t h2_mplx_engine_push(h2_mplx *m, h2_task *task, } return status; } + +static request_rec *get_non_frozen(apr_queue_t *equeue) +{ + request_rec *r, *first = NULL; + h2_task *task; + void *elem; + + if (equeue) { + /* FIFO queue, try to find a request_rec whose task is not frozen */ + while (apr_queue_trypop(equeue, &elem) == APR_SUCCESS) { + r = elem; + task = h2_ctx_rget_task(r); + AP_DEBUG_ASSERT(task); + if (!task->frozen) { + return r; + } + apr_queue_push(equeue, r); + if (!first) { + first = r; + } + else if (r == first) { + return NULL; /* walked the whole queue */ + } + } + } + return NULL; +} + +static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine, + apr_read_type_e block, request_rec **pr) +{ + request_rec *r; + + AP_DEBUG_ASSERT(m); + AP_DEBUG_ASSERT(engine); + while (1) { + if (m->aborted) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): mplx abort while pulling requests %s", + m->id, engine->pub.id); + *pr = NULL; + return APR_EOF; + } + + if (engine->queue && (r = get_non_frozen(engine->queue))) { + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, + "h2_mplx(%ld): request %s pulled by engine %s", + m->c->id, r->the_request, engine->pub.id); + engine->no_live++; + *pr = r; + return APR_SUCCESS; + } + else if (APR_NONBLOCK_READ == block) { + *pr = NULL; + return APR_EAGAIN; + } + else if (!engine->queue || !apr_queue_size(engine->queue)) { + engine->shutdown = 1; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): emtpy queue, shutdown engine %s", + m->id, engine->pub.id); + *pr = NULL; + return APR_EOF; + } + apr_thread_cond_timedwait(m->request_done, m->lock, + apr_time_from_msec(100)); + } +} -apr_status_t h2_mplx_engine_pull(h2_mplx *m, h2_task *task, - struct h2_req_engine *engine, - apr_time_t timeout, request_rec **pr) +apr_status_t h2_mplx_engine_pull(h2_req_engine *pub_engine, + apr_read_type_e block, request_rec **pr) { + h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine; + h2_mplx *m = engine->m; apr_status_t status; int acquired; - AP_DEBUG_ASSERT(m); + *pr = NULL; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - status = APR_ECONNABORTED; - if (m->engine == engine && m->engine_queue) { - void *elem; - status = apr_queue_trypop(m->engine_queue, &elem); - if (status == APR_SUCCESS) { - *pr = elem; - ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, *pr, - "h2_mplx(%ld): request %s pulled by engine %d", - m->c->id, (*pr)->the_request, engine->id); - } - } + status = engine_pull(m, engine, block, pr); leave_mutex(m, acquired); } return status; } -void h2_mplx_engine_done(h2_mplx *m, struct h2_task *task, conn_rec *r_conn) +static void engine_done(h2_mplx *m, h2_req_engine_i *engine, h2_task *task, + int waslive, int aborted) { - int stream_id = task->stream_id; + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, + "h2_mplx(%ld): task %s %s by %s", + m->id, task->id, aborted? "aborted":"done", + engine->pub.id); h2_task_output_close(task->output); - h2_mplx_request_done(&m, stream_id, NULL); - apr_pool_destroy(r_conn->pool); + h2_mplx_request_done(m, task->stream_id, NULL); + apr_pool_destroy(task->pool); + engine->no_finished++; + if (waslive) engine->no_live--; + engine->no_assigned--; +} + +void h2_mplx_engine_done(h2_req_engine *pub_engine, conn_rec *r_conn) +{ + h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine; + h2_mplx *m = engine->m; + h2_task *task; + int acquired; + + task = h2_ctx_cget_task(r_conn); + if (task && (enter_mutex(m, &acquired) == APR_SUCCESS)) { + engine_done(m, engine, task, 1, 0); + leave_mutex(m, acquired); + } } -void h2_mplx_engine_exit(h2_mplx *m, struct h2_task *task, - struct h2_req_engine *engine) +void h2_mplx_engine_exit(h2_req_engine *pub_engine) { + h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine; + h2_mplx *m = engine->m; int acquired; - AP_DEBUG_ASSERT(m); if (enter_mutex(m, &acquired) == APR_SUCCESS) { - /* TODO: shutdown of engine->c */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): exit engine %d (%s)", - m->c->id, engine->id, engine->type); - m->engine = NULL; + if (engine->queue && apr_queue_size(engine->queue)) { + void *entry; + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, + "h2_mplx(%ld): exit engine %s (%s), " + "has still %d requests queued, " + "assigned=%ld, live=%ld, finished=%ld", + m->c->id, engine->pub.id, engine->pub.type, + (int)apr_queue_size(engine->queue), + (long)engine->no_assigned, (long)engine->no_live, + (long)engine->no_finished); + while (apr_queue_trypop(engine->queue, &entry) == APR_SUCCESS) { + request_rec *r = entry; + h2_task *task = h2_ctx_rget_task(r); + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, + "h2_mplx(%ld): engine %s has queued task %s, " + "frozen=%d, aborting", + m->c->id, engine->pub.id, task->id, task->frozen); + engine_done(m, engine, task, 0, 1); + } + } + if (engine->no_assigned > 1 || engine->no_live > 1) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, + "h2_mplx(%ld): exit engine %s (%s), " + "assigned=%ld, live=%ld, finished=%ld", + m->c->id, engine->pub.id, engine->pub.type, + (long)engine->no_assigned, (long)engine->no_live, + (long)engine->no_finished); + } + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): exit engine %s (%s)", + m->c->id, engine->pub.id, engine->pub.type); + } + if (m->engine == &engine->pub) { + m->engine = NULL; /* TODO */ + } leave_mutex(m, acquired); } } diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 837025f996..724cfe1c2e 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -46,7 +46,7 @@ struct h2_io_set; struct apr_thread_cond_t; struct h2_workers; struct h2_stream_set; -struct h2_task_queue; +struct h2_int_queue; struct h2_req_engine; #include <apr_queue.h> @@ -69,7 +69,7 @@ struct h2_mplx { unsigned int aborted : 1; - struct h2_task_queue *q; + struct h2_int_queue *q; struct h2_io_set *stream_ios; struct h2_io_set *ready_ios; @@ -77,6 +77,7 @@ struct h2_mplx { apr_thread_mutex_t *lock; struct apr_thread_cond_t *added_output; + struct apr_thread_cond_t *request_done; struct apr_thread_cond_t *join_wait; apr_size_t stream_max_mem; @@ -91,7 +92,9 @@ struct h2_mplx { void *input_consumed_ctx; struct h2_req_engine *engine; + /* TODO: signal for waiting tasks*/ apr_queue_t *engine_queue; + int next_eng_id; }; @@ -127,7 +130,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, struct apr_thread_cond_t *wait */ void h2_mplx_abort(h2_mplx *mplx); -void h2_mplx_request_done(h2_mplx **pm, int stream_id, const struct h2_request **preq); +void h2_mplx_request_done(h2_mplx *m, int stream_id, const struct h2_request **preq); /** * Get the highest stream identifier that has been passed on to processing. @@ -151,10 +154,14 @@ int h2_mplx_get_max_stream_started(h2_mplx *m); */ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error); -/* Return != 0 iff the multiplexer has data for the given stream. +/* Return != 0 iff the multiplexer has output data for the given stream. */ int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id); +/* Return != 0 iff the multiplexer has input data for the given stream. + */ +int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id); + /** * Waits on output data from any stream in this session to become available. * Returns APR_TIMEUP if no data arrived in the given time. @@ -385,17 +392,14 @@ APR_RING_INSERT_TAIL((b), ap__b, h2_mplx, link); \ typedef apr_status_t h2_mplx_engine_init(struct h2_req_engine *engine, request_rec *r); -apr_status_t h2_mplx_engine_push(h2_mplx *m, struct h2_task *task, - const char *engine_type, +apr_status_t h2_mplx_engine_push(const char *engine_type, request_rec *r, h2_mplx_engine_init *einit); -apr_status_t h2_mplx_engine_pull(h2_mplx *m, struct h2_task *task, - struct h2_req_engine *engine, - apr_time_t timeout, request_rec **pr); +apr_status_t h2_mplx_engine_pull(struct h2_req_engine *engine, + apr_read_type_e block, request_rec **pr); -void h2_mplx_engine_done(h2_mplx *m, struct h2_task *task, conn_rec *r_conn); +void h2_mplx_engine_done(struct h2_req_engine *engine, conn_rec *r_conn); -void h2_mplx_engine_exit(h2_mplx *m, struct h2_task *task, - struct h2_req_engine *engine); +void h2_mplx_engine_exit(struct h2_req_engine *engine); #endif /* defined(__mod_h2__h2_mplx__) */ diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c index 8f7b5d0a98..a55d7ec739 100644 --- a/modules/http2/h2_proxy_session.c +++ b/modules/http2/h2_proxy_session.c @@ -21,12 +21,33 @@ #include <mod_http2.h> #include "h2.h" +#include "h2_int_queue.h" #include "h2_request.h" #include "h2_util.h" #include "h2_proxy_session.h" APLOG_USE_MODULE(proxy_http2); +typedef struct h2_proxy_stream { + int id; + apr_pool_t *pool; + h2_proxy_session *session; + + const char *url; + request_rec *r; + h2_request *req; + + h2_stream_state_t state; + unsigned int suspended : 1; + unsigned int data_received : 1; + + apr_bucket_brigade *input; + apr_bucket_brigade *output; + + apr_table_t *saves; +} h2_proxy_stream; + + static int ngstatus_from_apr_status(apr_status_t rv) { if (rv == APR_SUCCESS) { @@ -41,19 +62,21 @@ static int ngstatus_from_apr_status(apr_status_t rv) return NGHTTP2_ERR_PROTO; } +static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, + int arg, const char *msg); -static apr_status_t proxy_session_shutdown(void *theconn) + +static apr_status_t proxy_session_pre_close(void *theconn) { proxy_conn_rec *p_conn = (proxy_conn_rec *)theconn; h2_proxy_session *session = p_conn->data; if (session && session->ngh2) { - if (session->c && !session->c->aborted && !session->goaway_sent) { - nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, - session->max_stream_recv, 0, NULL, 0); - nghttp2_session_send(session->ngh2); - } - + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "proxy_session(%s): shutdown, state=%d, streams=%d", + session->id, session->state, + h2_iq_size(session->streams)); + dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL); nghttp2_session_del(session->ngh2); session->ngh2 = NULL; p_conn->data = NULL; @@ -109,8 +132,8 @@ static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data, int flush = 1; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, - "h2_proxy_sesssion(%ld): raw_send %d bytes, flush=%d", - session->c->id, (int)length, flush); + "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d", + session->id, (int)length, flush); b = apr_bucket_transient_create((const char*)data, length, session->c->bucket_alloc); APR_BRIGADE_INSERT_TAIL(session->output, b); @@ -120,7 +143,7 @@ static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data, session->output, flush); if (status != APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, - "h2_proxy_sesssion(%ld): sending", session->c->id); + "h2_proxy_sesssion(%s): sending", session->id); return NGHTTP2_ERR_CALLBACK_FAILURE; } return length; @@ -138,8 +161,8 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame, h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO() - "h2_session(%ld): recv FRAME[%s]", - session->c->id, buffer); + "h2_session(%s): recv FRAME[%s]", + session->id, buffer); } switch (frame->hd.type) { @@ -150,9 +173,22 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame, break; case NGHTTP2_PUSH_PROMISE: break; + case NGHTTP2_SETTINGS: + if (frame->settings.niv > 0) { + session->remote_max_concurrent = nghttp2_session_get_remote_settings(ngh2, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS); + } + break; case NGHTTP2_GOAWAY: - session->goaway_recvd = 1; + dispatch_event(session, H2_PROXYS_EV_REMOTE_GOAWAY, 0, NULL); /* TODO: close handling */ + if (APLOGcinfo(session->c)) { + char buffer[256]; + + h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, APLOGNO() + "h2_session(%s): recv FRAME[%s]", + session->id, buffer); + } break; default: break; @@ -164,13 +200,27 @@ static int before_frame_send(nghttp2_session *ngh2, const nghttp2_frame *frame, void *user_data) { h2_proxy_session *session = user_data; - if (APLOGcdebug(session->c)) { - char buffer[256]; - - h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03068) - "h2_session(%ld): sent FRAME[%s]", - session->c->id, buffer); + switch (frame->hd.type) { + case NGHTTP2_GOAWAY: + if (APLOGcinfo(session->c)) { + char buffer[256]; + + h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, APLOGNO() + "h2_session(%s): sent FRAME[%s]", + session->id, buffer); + } + break; + default: + if (APLOGcdebug(session->c)) { + char buffer[256]; + + h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO() + "h2_session(%s): sent FRAME[%s]", + session->id, buffer); + } + break; } return 0; } @@ -217,8 +267,8 @@ static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream, char *s = apr_pstrndup(stream->pool, v, vlen); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, - "h2_proxy_stream(%ld-%d): got status %s", - stream->session->c->id, stream->id, s); + "h2_proxy_stream(%s-%d): got status %s", + stream->session->id, stream->id, s); stream->r->status = (int)apr_atoi64(s); if (stream->r->status <= 0) { stream->r->status = 500; @@ -236,8 +286,8 @@ static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream, hvalue = apr_pstrndup(stream->pool, v, vlen); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, - "h2_proxy_stream(%ld-%d): got header %s: %s", - stream->session->c->id, stream->id, hname, hvalue); + "h2_proxy_stream(%s-%d): got header %s: %s", + stream->session->id, stream->id, hname, hvalue); process_proxy_header(stream->r, hname, hvalue); } return APR_SUCCESS; @@ -248,8 +298,8 @@ static int log_header(void *ctx, const char *key, const char *value) h2_proxy_stream *stream = ctx; ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, - "h2_proxy_stream(%ld-%d), header_out %s: %s", - stream->session->c->id, stream->id, key, value); + "h2_proxy_stream(%s-%d), header_out %s: %s", + stream->session->id, stream->id, key, value); return 1; } @@ -307,8 +357,8 @@ static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream) if (APLOGrtrace2(stream->r)) { ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, - "h2_proxy_stream(%ld-%d), header_out after merging", - stream->session->c->id, stream->id); + "h2_proxy_stream(%s-%d), header_out after merging", + stream->session->id, stream->id); apr_table_do(log_header, stream, stream->r->headers_out, NULL); } } @@ -338,11 +388,15 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags, b = apr_bucket_transient_create((const char*)data, len, stream->r->connection->bucket_alloc); APR_BRIGADE_INSERT_TAIL(stream->output, b); + if (flags & NGHTTP2_DATA_FLAG_EOF) { + b = apr_bucket_flush_create(stream->r->connection->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(stream->output, b); + } status = ap_pass_brigade(stream->r->output_filters, stream->output); if (status != APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO() - "h2_session(%ld-%d): passing output", - session->c->id, stream->id); + "h2_session(%s-%d): passing output", + session->id, stream->id); return NGHTTP2_ERR_CALLBACK_FAILURE; } return 0; @@ -352,23 +406,7 @@ static int on_stream_close(nghttp2_session *ngh2, int32_t stream_id, uint32_t error_code, void *user_data) { h2_proxy_session *session = user_data; - h2_proxy_stream *stream; - - stream = nghttp2_session_get_stream_user_data(ngh2, stream_id); - if (!stream) { - return 0; - } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, - "h2_proxy_sesssion(%ld): closing stream(%d)", - session->c->id, stream_id); - - if (!stream->data_received) { - /* last chance to manipulate response headers. - * after this, only trailers */ - stream->data_received = 1; - } - stream->state = H2_STREAM_ST_CLOSED; + dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL); return 0; } @@ -413,11 +451,11 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id, if (APR_BRIGADE_EMPTY(stream->input)) { status = ap_get_brigade(stream->r->input_filters, stream->input, - AP_MODE_READBYTES, APR_BLOCK_READ, - H2MIN(APR_BUCKET_BUFF_SIZE, length)); + AP_MODE_READBYTES, APR_NONBLOCK_READ, + H2MAX(APR_BUCKET_BUFF_SIZE, length)); ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, - "h2_proxy_stream(%d): request body read", - stream->id); + "h2_proxy_stream(%s-%d): request body read", + stream->session->id, stream->id); } if (status == APR_SUCCESS) { @@ -459,31 +497,42 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id, return readlen; } else if (APR_STATUS_IS_EAGAIN(status)) { + /* suspended stream, needs to be re-awakened */ + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, + "h2_proxy_stream(%s-%d): suspending", + stream->session->id, stream_id); + stream->suspended = 1; + h2_iq_add(stream->session->suspended, stream->id, NULL, NULL); return NGHTTP2_ERR_DEFERRED; } return ngstatus_from_apr_status(status); } -h2_proxy_session *h2_proxy_session_setup(request_rec *r, proxy_conn_rec *p_conn, - proxy_server_conf *conf) +h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn, + proxy_server_conf *conf, + h2_proxy_request_done *done) { if (!p_conn->data) { + apr_pool_t *pool = p_conn->scpool; h2_proxy_session *session; - nghttp2_settings_entry settings[2]; nghttp2_session_callbacks *cbs; - int add_conn_window; - int rv; + nghttp2_option *option; - session = apr_pcalloc(p_conn->scpool, sizeof(*session)); - apr_pool_pre_cleanup_register(p_conn->scpool, p_conn, proxy_session_shutdown); + session = apr_pcalloc(pool, sizeof(*session)); + apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close); p_conn->data = session; + session->id = apr_pstrdup(p_conn->scpool, id); session->c = p_conn->connection; session->p_conn = p_conn; session->conf = conf; session->pool = p_conn->scpool; + session->state = H2_PROXYS_ST_INIT; session->window_bits_default = 30; session->window_bits_connection = 30; + session->streams = h2_iq_create(pool, 25); + session->suspended = h2_iq_create(pool, 5); + session->done = done; session->input = apr_brigade_create(session->pool, session->c->bucket_alloc); session->output = apr_brigade_create(session->pool, session->c->bucket_alloc); @@ -496,70 +545,41 @@ h2_proxy_session *h2_proxy_session_setup(request_rec *r, proxy_conn_rec *p_conn, nghttp2_session_callbacks_set_before_frame_send_callback(cbs, before_frame_send); nghttp2_session_callbacks_set_send_callback(cbs, raw_send); - nghttp2_session_client_new(&session->ngh2, cbs, session); + nghttp2_option_new(&option); + nghttp2_option_set_peer_max_concurrent_streams(option, 20); + + nghttp2_session_client_new2(&session->ngh2, cbs, session, option); + + nghttp2_option_del(option); nghttp2_session_callbacks_del(cbs); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, "setup session for %s", p_conn->hostname); - settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH; - settings[0].value = 0; - settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; - settings[1].value = (1 << session->window_bits_default) - 1; - - rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings, - H2_ALEN(settings)); - - /* If the connection window is larger than our default, trigger a WINDOW_UPDATE */ - add_conn_window = ((1 << session->window_bits_connection) - 1 - - NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE); - if (!rv && add_conn_window != 0) { - rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, 0, add_conn_window); - } } return p_conn->data; } - -apr_status_t h2_proxy_session_open_stream(h2_proxy_session *session, const char *url, - request_rec *r, h2_proxy_stream **pstream) +static apr_status_t session_start(h2_proxy_session *session) { - h2_proxy_stream *stream; - apr_uri_t puri; - const char *authority, *scheme, *path; - - stream = apr_pcalloc(r->pool, sizeof(*stream)); - - stream->pool = r->pool; - stream->url = url; - stream->r = r; - stream->session = session; - stream->state = H2_STREAM_ST_IDLE; + nghttp2_settings_entry settings[2]; + int rv, add_conn_window; - stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc); - stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc); + settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH; + settings[0].value = 0; + settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; + settings[1].value = (1 << session->window_bits_default) - 1; - stream->req = h2_request_create(1, stream->pool, 0); - - apr_uri_parse(stream->pool, url, &puri); - scheme = (strcmp(puri.scheme, "h2")? "http" : "https"); - authority = puri.hostname; - if (!ap_strchr_c(authority, ':') && puri.port - && apr_uri_port_of_scheme(scheme) != puri.port) { - /* port info missing and port is not default for scheme: append */ - authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port); - } - path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART); - h2_request_make(stream->req, stream->pool, r->method, scheme, - authority, path, r->headers_in); - - /* Tuck away all already existing cookies */ - stream->saves = apr_table_make(r->pool, 2); - apr_table_do(add_header, stream->saves, r->headers_out,"Set-Cookie", NULL); - - *pstream = stream; + rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings, + H2_ALEN(settings)); - return APR_SUCCESS; + /* If the connection window is larger than our default, trigger a WINDOW_UPDATE */ + add_conn_window = ((1 << session->window_bits_connection) - 1 - + NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE); + if (!rv && add_conn_window != 0) { + rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, 0, add_conn_window); + } + return rv? APR_EGENERAL : APR_SUCCESS; } static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb) @@ -595,69 +615,57 @@ static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade * } ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, - "h2_session(%ld): fed %ld bytes of input", session->c->id, (long)readlen); + "h2_session(%s): fed %ld bytes of input to session", + session->id, (long)readlen); if (readlen == 0 && status == APR_SUCCESS) { return APR_EAGAIN; } return status; } - -static apr_status_t stream_loop(h2_proxy_stream *stream) +static apr_status_t open_stream(h2_proxy_session *session, const char *url, + request_rec *r, h2_proxy_stream **pstream) { - h2_proxy_session *session = stream->session; - apr_status_t status = APR_SUCCESS; - int want_read, want_write; + h2_proxy_stream *stream; + apr_uri_t puri; + const char *authority, *scheme, *path; + + stream = apr_pcalloc(r->pool, sizeof(*stream)); + + stream->pool = r->pool; + stream->url = url; + stream->r = r; + stream->session = session; + stream->state = H2_STREAM_ST_IDLE; - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, - "h2_session(%ld): start loop for stream %d", - session->c->id, stream->id); - while ((status == APR_SUCCESS || APR_STATUS_IS_EAGAIN(status)) - && stream->state != H2_STREAM_ST_CLOSED) { - - want_read = nghttp2_session_want_read(session->ngh2); - want_write = nghttp2_session_want_write(session->ngh2); - - if (want_write) { - int rv = nghttp2_session_send(session->ngh2); - if (rv < 0 && nghttp2_is_fatal(rv)) { - status = APR_EGENERAL; - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, - "h2_session(%ld): write, rv=%d", session->c->id, rv); - break; - } - } + stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc); + stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc); + + stream->req = h2_request_create(1, stream->pool, 0); - if (want_read) { - status = ap_get_brigade(session->c->input_filters, session->input, - AP_MODE_READBYTES, - (want_write? APR_NONBLOCK_READ : APR_BLOCK_READ), - APR_BUCKET_BUFF_SIZE); - if (status == APR_SUCCESS) { - status = feed_brigade(session, session->input); - } - else if (!APR_STATUS_IS_EAGAIN(status)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, - "h2_session(%ld): read", session->c->id); - break; - } - } - - if (!want_read && !want_write) { - status = APR_EGENERAL; - break; - } + apr_uri_parse(stream->pool, url, &puri); + scheme = (strcmp(puri.scheme, "h2")? "http" : "https"); + authority = puri.hostname; + if (!ap_strchr_c(authority, ':') && puri.port + && apr_uri_port_of_scheme(scheme) != puri.port) { + /* port info missing and port is not default for scheme: append */ + authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port); } + path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART); + h2_request_make(stream->req, stream->pool, r->method, scheme, + authority, path, r->headers_in); + + /* Tuck away all already existing cookies */ + stream->saves = apr_table_make(r->pool, 2); + apr_table_do(add_header, stream->saves, r->headers_out,"Set-Cookie", NULL); + + *pstream = stream; - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, - "h2_session(%ld): end loop for stream %d", - session->c->id, stream->id); - return status; + return APR_SUCCESS; } -apr_status_t h2_proxy_stream_process(h2_proxy_stream *stream) +static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *stream) { - h2_proxy_session *session = stream->session; h2_ngheader *hd; nghttp2_data_provider *pp = NULL; nghttp2_data_provider provider; @@ -685,16 +693,525 @@ apr_status_t h2_proxy_stream_process(h2_proxy_stream *stream) const char *task_id = apr_table_get(stream->r->connection->notes, H2_TASK_ID_NOTE); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_session(%ld): submit %s%s -> %d (task %s)", - session->c->id, stream->req->authority, stream->req->path, + "h2_session(%s): submit %s%s -> %d (task %s)", + session->id, stream->req->authority, stream->req->path, rv, task_id); } + else { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_session(%s-%d): submit %s%s", + session->id, rv, stream->req->authority, stream->req->path); + } + if (rv > 0) { stream->id = rv; stream->state = H2_STREAM_ST_OPEN; + h2_iq_add(session->streams, stream->id, NULL, NULL); + dispatch_event(session, H2_PROXYS_EV_STREAM_SUBMITTED, rv, NULL); - return stream_loop(stream); + return APR_SUCCESS; } return APR_EGENERAL; } +static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block) +{ + apr_status_t status; + status = ap_get_brigade(session->c->input_filters, session->input, + AP_MODE_READBYTES, + block? APR_BLOCK_READ : APR_NONBLOCK_READ, + APR_BUCKET_BUFF_SIZE); + if (status == APR_SUCCESS) { + if (APR_BRIGADE_EMPTY(session->input)) { + status = APR_EAGAIN; + } + else { + feed_brigade(session, session->input); + } + } + else if (!APR_STATUS_IS_EAGAIN(status)) { + dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL); + } + return status; +} + +apr_status_t h2_proxy_session_submit(h2_proxy_session *session, + const char *url, request_rec *r) +{ + h2_proxy_stream *stream; + apr_status_t status; + + status = open_stream(session, url, r, &stream); + if (status == OK) { + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, + "process stream(%d): %s %s%s, original: %s", + stream->id, stream->req->method, + stream->req->authority, stream->req->path, + r->the_request); + status = submit_stream(session, stream); + } + return status; +} + +static apr_status_t check_suspended(h2_proxy_session *session) +{ + h2_proxy_stream *stream; + int i, stream_id; + apr_status_t status; + + for (i = 0; i < session->suspended->nelts; ++i) { + stream_id = session->suspended->elts[i]; + stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id); + if (stream) { + status = ap_get_brigade(stream->r->input_filters, stream->input, + AP_MODE_READBYTES, APR_NONBLOCK_READ, + APR_BUCKET_BUFF_SIZE); + if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->input)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, + "h2_proxy_stream(%s-%d): resuming", + session->id, stream_id); + stream->suspended = 0; + h2_iq_remove(session->suspended, stream_id); + nghttp2_session_resume_data(session->ngh2, stream_id); + dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL); + check_suspended(session); + return APR_SUCCESS; + } + else if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, + "h2_proxy_stream(%s-%d): check input", + session->id, stream_id); + h2_iq_remove(session->suspended, stream_id); + dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL); + check_suspended(session); + return APR_SUCCESS; + } + } + else { + /* gone? */ + h2_iq_remove(session->suspended, stream_id); + check_suspended(session); + return APR_SUCCESS; + } + } + return APR_EAGAIN; +} + +static apr_status_t session_shutdown(h2_proxy_session *session, int reason, + const char *msg) +{ + apr_status_t status = APR_SUCCESS; + const char *err = msg; + + AP_DEBUG_ASSERT(session); + if (!err && reason) { + err = nghttp2_strerror(reason); + } + nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0, + reason, (uint8_t*)err, err? strlen(err):0); + status = nghttp2_session_send(session->ngh2); + dispatch_event(session, H2_PROXYS_EV_LOCAL_GOAWAY, reason, err); + return status; +} + + +static const char *StateNames[] = { + "INIT", /* H2_PROXYS_ST_INIT */ + "DONE", /* H2_PROXYS_ST_DONE */ + "IDLE", /* H2_PROXYS_ST_IDLE */ + "BUSY", /* H2_PROXYS_ST_BUSY */ + "WAIT", /* H2_PROXYS_ST_WAIT */ + "LSHUTDOWN", /* H2_PROXYS_ST_LOCAL_SHUTDOWN */ + "RSHUTDOWN", /* H2_PROXYS_ST_REMOTE_SHUTDOWN */ +}; + +static const char *state_name(h2_proxys_state state) +{ + if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) { + return "unknown"; + } + return StateNames[state]; +} + +static int is_accepting_streams(h2_proxy_session *session) +{ + switch (session->state) { + case H2_PROXYS_ST_IDLE: + case H2_PROXYS_ST_BUSY: + case H2_PROXYS_ST_WAIT: + return 1; + default: + return 0; + } +} + +static void transit(h2_proxy_session *session, const char *action, + h2_proxys_state nstate) +{ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO() + "h2_proxy_session(%s): transit [%s] -- %s --> [%s]", session->id, + state_name(session->state), action, state_name(nstate)); + session->state = nstate; +} + +static void ev_init(h2_proxy_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_PROXYS_ST_INIT: + if (h2_iq_empty(session->streams)) { + transit(session, "init", H2_PROXYS_ST_IDLE); + } + else { + transit(session, "init", H2_PROXYS_ST_BUSY); + } + break; + + default: + /* nop */ + break; + } +} + +static void ev_local_goaway(h2_proxy_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_PROXYS_ST_LOCAL_SHUTDOWN: + /* already did that? */ + break; + case H2_PROXYS_ST_IDLE: + case H2_PROXYS_ST_REMOTE_SHUTDOWN: + /* all done */ + transit(session, "local goaway", H2_PROXYS_ST_DONE); + break; + default: + transit(session, "local goaway", H2_PROXYS_ST_LOCAL_SHUTDOWN); + break; + } +} + +static void ev_remote_goaway(h2_proxy_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_PROXYS_ST_REMOTE_SHUTDOWN: + /* already received that? */ + break; + case H2_PROXYS_ST_IDLE: + case H2_PROXYS_ST_LOCAL_SHUTDOWN: + /* all done */ + transit(session, "remote goaway", H2_PROXYS_ST_DONE); + break; + default: + transit(session, "remote goaway", H2_PROXYS_ST_REMOTE_SHUTDOWN); + break; + } +} + +static void ev_conn_error(h2_proxy_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_PROXYS_ST_INIT: + case H2_PROXYS_ST_DONE: + case H2_PROXYS_ST_LOCAL_SHUTDOWN: + /* just leave */ + transit(session, "conn error", H2_PROXYS_ST_DONE); + break; + + default: + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_session(%s): conn error -> shutdown", session->id); + session_shutdown(session, arg, msg); + break; + } +} + +static void ev_proto_error(h2_proxy_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_PROXYS_ST_DONE: + case H2_PROXYS_ST_LOCAL_SHUTDOWN: + /* just leave */ + transit(session, "proto error", H2_PROXYS_ST_DONE); + break; + + default: + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_session(%s): proto error -> shutdown", session->id); + session_shutdown(session, arg, msg); + break; + } +} + +static void ev_conn_timeout(h2_proxy_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_PROXYS_ST_LOCAL_SHUTDOWN: + transit(session, "conn timeout", H2_PROXYS_ST_DONE); + break; + default: + session_shutdown(session, arg, msg); + transit(session, "conn timeout", H2_PROXYS_ST_DONE); + break; + } +} + +static void ev_no_io(h2_proxy_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_PROXYS_ST_BUSY: + case H2_PROXYS_ST_LOCAL_SHUTDOWN: + case H2_PROXYS_ST_REMOTE_SHUTDOWN: + /* nothing for input and output to do. If we remain + * in this state, we go into a tight loop and suck up + * CPU cycles. Ideally, we'd like to do a blocking read, but that + * is not possible if we have scheduled tasks and wait + * for them to produce something. */ + if (h2_iq_empty(session->streams)) { + if (!is_accepting_streams(session)) { + /* We are no longer accepting new streams and have + * finished processing existing ones. Time to leave. */ + session_shutdown(session, arg, msg); + transit(session, "no io", H2_PROXYS_ST_DONE); + } + else { + /* When we have no streams, no task event are possible, + * switch to blocking reads */ + transit(session, "no io", H2_PROXYS_ST_IDLE); + } + } + else { + /* Unable to do blocking reads, as we wait on events from + * task processing in other threads. Do a busy wait with + * backoff timer. */ + transit(session, "no io", H2_PROXYS_ST_WAIT); + } + break; + default: + /* nop */ + break; + } +} + +static void ev_stream_submitted(h2_proxy_session *session, int stream_id, + const char *msg) +{ + switch (session->state) { + case H2_PROXYS_ST_IDLE: + case H2_PROXYS_ST_WAIT: + transit(session, "stream submitted", H2_PROXYS_ST_BUSY); + break; + default: + /* nop */ + break; + } +} + +static void ev_stream_done(h2_proxy_session *session, int stream_id, + const char *msg) +{ + h2_proxy_stream *stream; + + stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id); + if (stream) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_proxy_sesssion(%s): stream(%d) closed", + session->id, stream_id); + if (!stream->data_received) { + /* last chance to manipulate response headers. + * after this, only trailers */ + stream->data_received = 1; + } + stream->state = H2_STREAM_ST_CLOSED; + h2_iq_remove(session->streams, stream_id); + h2_iq_remove(session->suspended, stream_id); + if (session->done) { + session->done(session, stream->r); + } + } + + switch (session->state) { + default: + /* nop */ + break; + } +} + +static void ev_stream_resumed(h2_proxy_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_PROXYS_ST_WAIT: + transit(session, "stream resumed", H2_PROXYS_ST_BUSY); + break; + default: + /* nop */ + break; + } +} + +static void ev_data_read(h2_proxy_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_PROXYS_ST_IDLE: + case H2_PROXYS_ST_WAIT: + transit(session, "data read", H2_PROXYS_ST_BUSY); + break; + /* fall through */ + default: + /* nop */ + break; + } +} + +static void ev_ngh2_done(h2_proxy_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_PROXYS_ST_DONE: + /* nop */ + break; + default: + transit(session, "nghttp2 done", H2_PROXYS_ST_DONE); + break; + } +} + +static void ev_pre_close(h2_proxy_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_PROXYS_ST_DONE: + case H2_PROXYS_ST_LOCAL_SHUTDOWN: + /* nop */ + break; + default: + session_shutdown(session, arg, msg); + break; + } +} + +static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, + int arg, const char *msg) +{ + switch (ev) { + case H2_PROXYS_EV_INIT: + ev_init(session, arg, msg); + break; + case H2_PROXYS_EV_LOCAL_GOAWAY: + ev_local_goaway(session, arg, msg); + break; + case H2_PROXYS_EV_REMOTE_GOAWAY: + ev_remote_goaway(session, arg, msg); + break; + case H2_PROXYS_EV_CONN_ERROR: + ev_conn_error(session, arg, msg); + break; + case H2_PROXYS_EV_PROTO_ERROR: + ev_proto_error(session, arg, msg); + break; + case H2_PROXYS_EV_CONN_TIMEOUT: + ev_conn_timeout(session, arg, msg); + break; + case H2_PROXYS_EV_NO_IO: + ev_no_io(session, arg, msg); + break; + case H2_PROXYS_EV_STREAM_SUBMITTED: + ev_stream_submitted(session, arg, msg); + break; + case H2_PROXYS_EV_STREAM_DONE: + ev_stream_done(session, arg, msg); + break; + case H2_PROXYS_EV_STREAM_RESUMED: + ev_stream_resumed(session, arg, msg); + break; + case H2_PROXYS_EV_DATA_READ: + ev_data_read(session, arg, msg); + break; + case H2_PROXYS_EV_NGH2_DONE: + ev_ngh2_done(session, arg, msg); + break; + case H2_PROXYS_EV_PRE_CLOSE: + ev_pre_close(session, arg, msg); + break; + default: + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_session(%s): unknown event %d", + session->id, ev); + break; + } +} + +apr_status_t h2_proxy_session_process(h2_proxy_session *session) +{ + apr_status_t status; + int have_written = 0, have_read = 0; + + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "h2_session(%s): process", session->id); + + switch (session->state) { + case H2_PROXYS_ST_INIT: + status = session_start(session); + if (status == APR_SUCCESS) { + dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL); + } + else { + dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL); + } + break; + + case H2_PROXYS_ST_BUSY: + case H2_PROXYS_ST_LOCAL_SHUTDOWN: + case H2_PROXYS_ST_REMOTE_SHUTDOWN: + while (nghttp2_session_want_write(session->ngh2)) { + int rv = nghttp2_session_send(session->ngh2); + if (rv < 0 && nghttp2_is_fatal(rv)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "h2_session(%s): write, rv=%d", session->id, rv); + dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL); + break; + } + have_written = 1; + } + + if (nghttp2_session_want_read(session->ngh2)) { + if (h2_proxy_session_read(session, 0) == APR_SUCCESS) { + have_read = 1; + } + } + + if (!have_written && !have_read + && !nghttp2_session_want_write(session->ngh2)) { + dispatch_event(session, H2_PROXYS_EV_NO_IO, 0, NULL); + } + break; + + case H2_PROXYS_ST_WAIT: + if (check_suspended(session) == APR_EAGAIN) { + /* no stream has become resumed. Do a blocking read with + * ever increasing timeouts... */ + if (h2_proxy_session_read(session, 0) == APR_SUCCESS) { + dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL); + } + } + break; + + case H2_PROXYS_ST_IDLE: + return APR_SUCCESS; + + case H2_PROXYS_ST_DONE: + return APR_SUCCESS; + + default: + ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c, + APLOGNO()"h2_session(%s): unknown state %d", + session->id, session->state); + dispatch_event(session, H2_PROXYS_EV_PROTO_ERROR, 0, NULL); + break; + } + + + if (!nghttp2_session_want_read(session->ngh2) + && !nghttp2_session_want_write(session->ngh2)) { + dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL); + } + + return APR_EAGAIN; +} + diff --git a/modules/http2/h2_proxy_session.h b/modules/http2/h2_proxy_session.h index 38924afdee..089fd107a5 100644 --- a/modules/http2/h2_proxy_session.h +++ b/modules/http2/h2_proxy_session.h @@ -20,50 +20,72 @@ #include <nghttp2/nghttp2.h> -typedef struct h2_proxy_session { +struct h2_int_queue; + +typedef enum { + H2_PROXYS_ST_INIT, /* send initial SETTINGS, etc. */ + H2_PROXYS_ST_DONE, /* finished, connection close */ + H2_PROXYS_ST_IDLE, /* no streams to process */ + H2_PROXYS_ST_BUSY, /* read/write without stop */ + H2_PROXYS_ST_WAIT, /* waiting for tasks reporting back */ + H2_PROXYS_ST_LOCAL_SHUTDOWN, /* we announced GOAWAY */ + H2_PROXYS_ST_REMOTE_SHUTDOWN, /* client announced GOAWAY */ +} h2_proxys_state; + +typedef enum { + H2_PROXYS_EV_INIT, /* session was initialized */ + H2_PROXYS_EV_LOCAL_GOAWAY, /* we send a GOAWAY */ + H2_PROXYS_EV_REMOTE_GOAWAY, /* remote send us a GOAWAY */ + H2_PROXYS_EV_CONN_ERROR, /* connection error */ + H2_PROXYS_EV_PROTO_ERROR, /* protocol error */ + H2_PROXYS_EV_CONN_TIMEOUT, /* connection timeout */ + H2_PROXYS_EV_NO_IO, /* nothing has been read or written */ + H2_PROXYS_EV_STREAM_SUBMITTED, /* stream has been submitted */ + H2_PROXYS_EV_STREAM_DONE, /* stream has been finished */ + H2_PROXYS_EV_STREAM_RESUMED, /* stream signalled availability of headers/data */ + H2_PROXYS_EV_DATA_READ, /* connection data has been read */ + H2_PROXYS_EV_NGH2_DONE, /* nghttp2 wants neither read nor write anything */ + H2_PROXYS_EV_PRE_CLOSE, /* connection will close after this */ +} h2_proxys_event_t; + + +typedef struct h2_proxy_session h2_proxy_session; +typedef void h2_proxy_request_done(h2_proxy_session *s, request_rec *r); + +struct h2_proxy_session { + const char *id; conn_rec *c; proxy_conn_rec *p_conn; proxy_server_conf *conf; apr_pool_t *pool; nghttp2_session *ngh2; /* the nghttp2 session itself */ + h2_proxy_request_done *done; + void *user_data; + int window_bits_default; int window_bits_connection; - unsigned int goaway_recvd : 1; - unsigned int goaway_sent : 1; - + h2_proxys_state state; + + struct h2_int_queue *streams; + struct h2_int_queue *suspended; + apr_size_t remote_max_concurrent; int max_stream_recv; apr_bucket_brigade *input; apr_bucket_brigade *output; -} h2_proxy_session; - -typedef struct h2_proxy_stream { - int id; - apr_pool_t *pool; - h2_proxy_session *session; - - const char *url; - request_rec *r; - h2_request *req; - - h2_stream_state_t state; - unsigned int data_received : 1; - - apr_bucket_brigade *input; - apr_bucket_brigade *output; - - apr_table_t *saves; -} h2_proxy_stream; +}; +h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn, + proxy_server_conf *conf, + h2_proxy_request_done *done); -h2_proxy_session *h2_proxy_session_setup(request_rec *r, proxy_conn_rec *p_connm, - proxy_server_conf *conf); +apr_status_t h2_proxy_session_submit(h2_proxy_session *s, const char *url, + request_rec *r); + +apr_status_t h2_proxy_session_process(h2_proxy_session *s); -apr_status_t h2_proxy_session_open_stream(h2_proxy_session *s, const char *url, - request_rec *r, h2_proxy_stream **pstream); -apr_status_t h2_proxy_stream_process(h2_proxy_stream *stream); #define H2_PROXY_REQ_URL_NOTE "h2-proxy-req-url" diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 0eb68a54be..cd49843a0e 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -103,8 +103,6 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id) return stream; } -#ifdef H2_NG2_STREAM_API - /** * Determine the importance of streams when scheduling tasks. * - if both stream depend on the same one, compare weights @@ -158,20 +156,6 @@ static int stream_pri_cmp(int sid1, int sid2, void *ctx) return spri_cmp(sid1, s1, sid2, s2, session); } -#else /* ifdef H2_NG2_STREAM_API */ - -/* In absence of nghttp2_stream API, which gives information about - * priorities since nghttp2 1.3.x, we just sort the streams by - * their identifier, aka. order of arrival. - */ -static int stream_pri_cmp(int sid1, int sid2, void *ctx) -{ - (void)ctx; - return sid1 - sid2; -} - -#endif /* (ifdef else) H2_NG2_STREAM_API */ - static apr_status_t stream_schedule(h2_session *session, h2_stream *stream, int eos) { diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 9fe003247d..8ccb279419 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -298,6 +298,8 @@ apr_status_t h2_task_freeze(h2_task *task, request_rec *r) task->frozen = 1; task->frozen_out = apr_brigade_create(c->pool, c->bucket_alloc); ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->output->c, + "h2_task(%s), frozen", task->id); } return APR_SUCCESS; } @@ -306,6 +308,8 @@ apr_status_t h2_task_thaw(h2_task *task) { if (task->frozen) { task->frozen = 0; + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->output->c, + "h2_task(%s), thawed", task->id); } return APR_SUCCESS; } diff --git a/modules/http2/h2_task_input.c b/modules/http2/h2_task_input.c index 953433d459..7409c363db 100644 --- a/modules/http2/h2_task_input.c +++ b/modules/http2/h2_task_input.c @@ -50,6 +50,7 @@ h2_task_input *h2_task_input_create(h2_task *task, conn_rec *c) input->c = c; input->task = task; input->bb = NULL; + input->block = APR_BLOCK_READ; if (task->ser_headers) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, @@ -75,6 +76,11 @@ h2_task_input *h2_task_input_create(h2_task *task, conn_rec *c) return input; } +void h2_task_input_block_set(h2_task_input *input, apr_read_type_e block) +{ + input->block = block; +} + apr_status_t h2_task_input_read(h2_task_input *input, ap_filter_t* f, apr_bucket_brigade* bb, @@ -115,7 +121,7 @@ apr_status_t h2_task_input_read(h2_task_input *input, return APR_EOF; } - while ((bblen == 0) || (mode == AP_MODE_READBYTES && bblen < readbytes)) { + while (bblen == 0) { /* Get more data for our stream from mplx. */ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, @@ -124,27 +130,31 @@ apr_status_t h2_task_input_read(h2_task_input *input, input->task->id, block, (long)readbytes, (long)bblen); - /* Although we sometimes get called with APR_NONBLOCK_READs, - we need to fill our buffer blocking. Otherwise we get EAGAIN, - return that to our caller and everyone throws up their hands, - never calling us again. */ - status = h2_mplx_in_read(input->task->mplx, APR_BLOCK_READ, + /* Override the block mode we get called with depending on the input's + * setting. + */ + status = h2_mplx_in_read(input->task->mplx, block, input->task->stream_id, input->bb, f->r? f->r->trailers_in : NULL, input->task->io); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, "h2_task_input(%s): mplx in read returned", input->task->id); - if (status != APR_SUCCESS) { + if (APR_STATUS_IS_EAGAIN(status) + && (mode == AP_MODE_GETLINE || block == APR_BLOCK_READ)) { + /* chunked input handling does not seem to like it if we + * return with APR_EAGAIN from a GETLINE read... + * upload 100k test on test-ser.example.org hangs */ + status = APR_SUCCESS; + } + else if (status != APR_SUCCESS) { return status; } + status = apr_brigade_length(input->bb, 1, &bblen); if (status != APR_SUCCESS) { return status; } - if ((bblen == 0) && (block == APR_NONBLOCK_READ)) { - return h2_util_has_eos(input->bb, -1)? APR_EOF : APR_EAGAIN; - } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, "h2_task_input(%s): mplx in read, %ld bytes in brigade", diff --git a/modules/http2/h2_task_input.h b/modules/http2/h2_task_input.h index 6085b78c8f..1488dcab49 100644 --- a/modules/http2/h2_task_input.h +++ b/modules/http2/h2_task_input.h @@ -29,6 +29,7 @@ struct h2_task_input { conn_rec *c; struct h2_task *task; apr_bucket_brigade *bb; + apr_read_type_e block; }; @@ -41,4 +42,6 @@ apr_status_t h2_task_input_read(h2_task_input *input, apr_read_type_e block, apr_off_t readbytes); +void h2_task_input_block_set(h2_task_input *input, apr_read_type_e block); + #endif /* defined(__mod_h2__h2_task_input__) */ diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index db717d9dc5..b64683ab17 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -1110,7 +1110,7 @@ int h2_util_frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen) size_t len = (frame->goaway.opaque_data_len < s_len)? frame->goaway.opaque_data_len : s_len-1; memcpy(scratch, frame->goaway.opaque_data, len); - scratch[len+1] = '\0'; + scratch[len] = '\0'; return apr_snprintf(buffer, maxlen, "GOAWAY[error=%d, reason='%s']", frame->goaway.error_code, scratch); } diff --git a/modules/http2/h2_worker.c b/modules/http2/h2_worker.c index 56feec118d..fd0e9bae06 100644 --- a/modules/http2/h2_worker.c +++ b/modules/http2/h2_worker.c @@ -61,6 +61,7 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx) while (req) { conn_rec *c, *master = m->c; + h2_task *task; int stream_id = req->id; if (!task_pool) { @@ -81,38 +82,32 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx) c = h2_slave_create(master, task_pool, worker->thread, worker->socket); - if (!c) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c, - APLOGNO(02957) "h2_request(%ld-%d): error setting up slave connection", - m->id, stream_id); - h2_mplx_out_rst(m, stream_id, H2_ERR_INTERNAL_ERROR); + + task = h2_task_create(m->id, req, task_pool, m); + h2_ctx_create_for(c, task); + + h2_task_do(task, c, worker->io, worker->socket); + + if (task->frozen) { + /* this task was handed over to someone else for processing */ + h2_task_thaw(task); + task_pool = NULL; + req = NULL; + h2_mplx_request_done(m, 0, worker->aborted? NULL : &req); } else { - h2_task *task; - - task = h2_task_create(m->id, req, task_pool, m); - h2_ctx_create_for(c, task); - h2_task_do(task, c, worker->io, worker->socket); - - if (task->frozen) { - /* this task was handed over to someone else for - * processing */ - task_pool = NULL; - } - task = NULL; + /* clean our references and report request as done. Signal + * that we want another unless we have been aborted */ + /* TODO: this will keep a worker attached to this h2_mplx as + * long as it has requests to handle. Might no be fair to + * other mplx's. Perhaps leave after n requests? */ apr_thread_cond_signal(worker->io); + if (task_pool) { + apr_pool_clear(task_pool); + } + req = NULL; + h2_mplx_request_done(m, stream_id, worker->aborted? NULL : &req); } - - /* clean our references and report request as done. Signal - * that we want another unless we have been aborted */ - /* TODO: this will keep a worker attached to this h2_mplx as - * long as it has requests to handle. Might no be fair to - * other mplx's. Perhaps leave after n requests? */ - req = NULL; - if (task_pool) { - apr_pool_clear(task_pool); - } - h2_mplx_request_done(&m, stream_id, worker->aborted? NULL : &req); } } diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index f7606dc4ed..9d648e9948 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -26,7 +26,6 @@ #include "h2_private.h" #include "h2_mplx.h" #include "h2_request.h" -#include "h2_task_queue.h" #include "h2_worker.h" #include "h2_workers.h" diff --git a/modules/http2/h2_workers.h b/modules/http2/h2_workers.h index 7ec3881310..c6a24cab14 100644 --- a/modules/http2/h2_workers.h +++ b/modules/http2/h2_workers.h @@ -27,7 +27,6 @@ struct apr_thread_cond_t; struct h2_mplx; struct h2_request; struct h2_task; -struct h2_task_queue; typedef struct h2_workers h2_workers; diff --git a/modules/http2/mod_http2.c b/modules/http2/mod_http2.c index 49de804bb6..87cac2f678 100644 --- a/modules/http2/mod_http2.c +++ b/modules/http2/mod_http2.c @@ -132,50 +132,24 @@ static apr_status_t http2_req_engine_push(const char *engine_type, request_rec *r, h2_req_engine_init *einit) { - h2_ctx *ctx = h2_ctx_rget(r); - if (ctx) { - h2_task *task = h2_ctx_get_task(ctx); - if (task) { - return h2_mplx_engine_push(task->mplx, task, engine_type, r, einit); - } - } - return APR_EINVAL; + return h2_mplx_engine_push(engine_type, r, einit); } static apr_status_t http2_req_engine_pull(h2_req_engine *engine, - apr_time_t timeout, request_rec **pr) + apr_read_type_e block, + request_rec **pr) { - h2_ctx *ctx = h2_ctx_get(engine->c, 0); - if (ctx) { - h2_task *task = h2_ctx_get_task(ctx); - if (task) { - return h2_mplx_engine_pull(task->mplx, task, engine, timeout, pr); - } - } - return APR_ECONNABORTED; + return h2_mplx_engine_pull(engine, block, pr); } static void http2_req_engine_done(h2_req_engine *engine, conn_rec *r_conn) { - h2_ctx *ctx = h2_ctx_get(r_conn, 0); - if (ctx) { - h2_task *task = h2_ctx_get_task(ctx); - if (task) { - h2_mplx_engine_done(task->mplx, task, r_conn); - /* task is destroyed */ - } - } + h2_mplx_engine_done(engine, r_conn); } static void http2_req_engine_exit(h2_req_engine *engine) { - h2_ctx *ctx = h2_ctx_get(engine->c, 0); - if (ctx) { - h2_task *task = h2_ctx_get_task(ctx); - if (task) { - h2_mplx_engine_exit(task->mplx, task, engine); - } - } + h2_mplx_engine_exit(engine); } diff --git a/modules/http2/mod_http2.dsp b/modules/http2/mod_http2.dsp index a1dfec4700..0431548a96 100644 --- a/modules/http2/mod_http2.dsp +++ b/modules/http2/mod_http2.dsp @@ -141,6 +141,10 @@ SOURCE=./h2_h2.c # End Source File # Begin Source File +SOURCE=./h2_int_queue.c +# End Source File +# Begin Source File + SOURCE=./h2_io.c # End Source File # Begin Source File @@ -193,10 +197,6 @@ SOURCE=./h2_task_output.c # End Source File # Begin Source File -SOURCE=./h2_task_queue.c -# End Source File -# Begin Source File - SOURCE=./h2_util.c # End Source File # Begin Source File diff --git a/modules/http2/mod_http2.h b/modules/http2/mod_http2.h index 8055320ef5..edacd0f134 100644 --- a/modules/http2/mod_http2.h +++ b/modules/http2/mod_http2.h @@ -29,9 +29,11 @@ APR_DECLARE_OPTIONAL_FN(int, /******************************************************************************* - * HTTP/2 slave engines + * HTTP/2 request engines ******************************************************************************/ +struct apr_thread_cond_t; + typedef struct h2_req_engine h2_req_engine; /** @@ -45,25 +47,17 @@ typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, request_rec *r); /** * The public structure of a h2_req_engine. It gets allocated by the http2 - * infrastructure, assigned id, type, pool and connection and passed to the + * infrastructure, assigned id, type, pool, io and connection and passed to the * h2_req_engine_init() callback to complete initialization. * This happens whenever a new request gets "push"ed for an engine type and * no instance, or no free instance, for the type is available. */ struct h2_req_engine { - int id; /* identifier, unique for a master connection */ - const char *type; /* name of the engine type */ + const char *id; /* identifier */ apr_pool_t *pool; /* pool for engine specific allocations */ - conn_rec *c; /* connection this engine is assigned to */ - apr_size_t r_capacity; /* request capacity engine is willing to handle, - may change between invocations. If the engine - sets this to 0, it signals that it no longer - wants more requests. New requests, already - scheduled for this engine might still arrive for - a time. */ - apr_size_t r_count; /* number of request currently assigned, it is the - responsibility of the engine to update this. */ - void *data; /* engine specific data */ + const char *type; /* name of the engine type */ + apr_size_t capacity; /* number of max assigned requests */ + void *user_data; /* user specific data */ }; /** @@ -95,7 +89,7 @@ APR_DECLARE_OPTIONAL_FN(apr_status_t, */ APR_DECLARE_OPTIONAL_FN(apr_status_t, http2_req_engine_pull, (h2_req_engine *engine, - apr_time_t timeout, + apr_read_type_e block, request_rec **pr)); APR_DECLARE_OPTIONAL_FN(void, http2_req_engine_done, (h2_req_engine *engine, diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c index 1130d357c5..dbb041cb4b 100644 --- a/modules/http2/mod_proxy_http2.c +++ b/modules/http2/mod_proxy_http2.c @@ -21,6 +21,7 @@ #include "mod_proxy_http2.h" +#include "h2_int_queue.h" #include "h2_request.h" #include "h2_util.h" #include "h2_version.h" @@ -43,12 +44,13 @@ static int (*is_h2)(conn_rec *c); static apr_status_t (*req_engine_push)(const char *name, request_rec *r, h2_req_engine_init *einit); static apr_status_t (*req_engine_pull)(h2_req_engine *engine, - apr_time_t timeout, request_rec **pr); + apr_read_type_e block, request_rec **pr); static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn); static void (*req_engine_exit)(h2_req_engine *engine); typedef struct h2_proxy_ctx { conn_rec *owner; + request_rec *rbase; server_rec *server; const char *proxy_func; char server_portstr[32]; @@ -189,19 +191,51 @@ static int proxy_http2_canon(request_rec *r, char *url) static apr_status_t proxy_engine_init(h2_req_engine *engine, request_rec *r) { - h2_proxy_ctx *ctx = ap_get_module_config(engine->c->conn_config, + h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config, &proxy_http2_module); if (ctx) { + engine->capacity = 20; /* conservative guess until we know */ ctx->engine = engine; return APR_SUCCESS; } + ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, + "h2_proxy_session, engine init, no ctx found"); return APR_ENOTIMPL; } -static int proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) { - int status = OK; +static apr_status_t add_request(h2_proxy_session *session, request_rec *r) +{ + h2_proxy_ctx *ctx = session->user_data; + const char *url; + apr_status_t status; + + url = apr_table_get(r->notes, H2_PROXY_REQ_URL_NOTE); + status = h2_proxy_session_submit(session, url, r); + if (status != OK) { + ap_log_cerror(APLOG_MARK, APLOG_ERR, status, r->connection, APLOGNO() + "pass request body failed to %pI (%s) from %s (%s)", + ctx->p_conn->addr, ctx->p_conn->hostname ? + ctx->p_conn->hostname: "", session->c->client_ip, + session->c->remote_host ? session->c->remote_host: ""); + } + return status; +} + +static void request_done(h2_proxy_session *session, request_rec *r) +{ + h2_proxy_ctx *ctx = session->user_data; + + if (req_engine_done && r != ctx->rbase) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection, + "h2_proxy_session(%s): request %s", + ctx->engine->id, r->the_request); + req_engine_done(ctx->engine, r->connection); + } +} + +static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) { + apr_status_t status = OK; h2_proxy_session *session; - h2_proxy_stream *stream; /* Step Two: Make the Connection (or check that an already existing * socket is still usable). On success, we have a socket connected to @@ -245,52 +279,45 @@ static int proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) { /* Step Four: Send the Request in a new HTTP/2 stream and * loop until we got the response or encounter errors. */ - status = APR_ENOTIMPL; - session = h2_proxy_session_setup(r, ctx->p_conn, ctx->conf); + session = h2_proxy_session_setup(ctx->engine->id, ctx->p_conn, + ctx->conf, request_done); if (!session) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->p_conn->connection, "session unavailable"); return HTTP_SERVICE_UNAVAILABLE; } - while (r) { - conn_rec *r_conn = r->connection; - const char *url; - - url = apr_table_get(r->notes, H2_PROXY_REQ_URL_NOTE); - status = h2_proxy_session_open_stream(session, url, r, &stream); - if (status == OK) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r_conn, - "process stream(%d): %s %s%s, original: %s", - stream->id, stream->req->method, - stream->req->authority, stream->req->path, - r->the_request); - status = h2_proxy_stream_process(stream); - } - r = NULL; - - if (status != OK) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, status, r_conn, APLOGNO() - "pass request body failed to %pI (%s) from %s (%s)", - ctx->p_conn->addr, ctx->p_conn->hostname ? - ctx->p_conn->hostname: "", session->c->client_ip, - session->c->remote_host ? session->c->remote_host: ""); - } - - if (!ctx->standalone && req_engine_done && r_conn != ctx->owner) { - req_engine_done(ctx->engine, r_conn); - } - r_conn = NULL; + session->user_data = ctx; + add_request(session, r); + + status = APR_EAGAIN; + while (APR_STATUS_IS_EAGAIN(status)) { + status = h2_proxy_session_process(session); - if (!ctx->standalone && req_engine_pull) { - status = req_engine_pull(ctx->engine, ctx->server->timeout, &r); - if (status != APR_SUCCESS) { - status = APR_SUCCESS; - break; + if (APR_STATUS_IS_EAGAIN(status) && !ctx->standalone) { + ctx->engine->capacity = session->remote_max_concurrent; + if (req_engine_pull(ctx->engine, APR_NONBLOCK_READ, &r) == APR_SUCCESS) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_proxy_session(%s): pulled request %s", + session->id, r->the_request); + add_request(session, r); } } } + if (session->state == H2_PROXYS_ST_DONE) { + ctx->p_conn->close = 1; + } + + if (session->streams && !h2_iq_empty(session->streams)) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, + ctx->p_conn->connection, + "session run done with %d streams unfinished", + h2_iq_size(session->streams)); + } + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, + ctx->p_conn->connection, "session run done"); + session->user_data = NULL; return status; } @@ -339,6 +366,7 @@ static int proxy_http2_handler(request_rec *r, ctx = apr_pcalloc(p, sizeof(*ctx)); ctx->owner = c; + ctx->rbase = r; ctx->server = s; ctx->proxy_func = proxy_func; ctx->is_ssl = is_ssl; @@ -387,12 +415,12 @@ static int proxy_http2_handler(request_rec *r, * the same backend. We may be called to create an engine ourself. */ status = req_engine_push(engine_type, r, proxy_engine_init); - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, - "H2: pushing request %s to engine type %s", - url, engine_type); if (status == APR_SUCCESS && ctx->engine == NULL) { /* Another engine instance has taken over processing of this * request. */ + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, + "H2: pushed request %s to engine type %s", + url, engine_type); goto cleanup; } } @@ -401,25 +429,41 @@ static int proxy_http2_handler(request_rec *r, /* No engine was available or has been initialized, handle this * request just by ourself. */ h2_req_engine *engine = apr_pcalloc(p, sizeof(*engine)); - engine->id = 0; + engine->id = apr_psprintf(p, "eng-proxy-%ld", c->id); engine->type = engine_type; engine->pool = p; - engine->c = c; + engine->capacity = 1; ctx->engine = engine; ctx->standalone = 1; + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, + "h2_proxy_http2(%ld): setup standalone engine for type %s", + c->id, engine_type); + } + else { + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, + "H2: hosting engine %s for request %s", ctx->engine->id, url); } - status = proxy_engine_run(ctx, r); + status = proxy_engine_run(ctx, r); + if (!ctx->standalone && status == APR_SUCCESS) { + apr_status_t s2; + do { + s2 = req_engine_pull(ctx->engine, APR_BLOCK_READ, &r); + if (s2 == APR_SUCCESS) { + s2 = proxy_engine_run(ctx, r); + } + } while (s2 != APR_EOF); + } cleanup: - if (ctx->engine && !ctx->standalone && req_engine_exit) { + if (!ctx->standalone && ctx->engine && req_engine_exit) { req_engine_exit(ctx->engine); } ctx->engine = NULL; if (ctx) { if (ctx->p_conn) { - if (status != OK) { + if (status != APR_SUCCESS) { ctx->p_conn->close = 1; } proxy_run_detach_backend(r, ctx->p_conn); |