summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES5
-rw-r--r--CMakeLists.txt2
-rw-r--r--modules/http2/NWGNUmod_http22
-rw-r--r--modules/http2/config2.m49
-rw-r--r--modules/http2/h2_ctx.c12
-rw-r--r--modules/http2/h2_ctx.h2
-rw-r--r--modules/http2/h2_int_queue.c182
-rw-r--r--modules/http2/h2_int_queue.h103
-rw-r--r--modules/http2/h2_io.c17
-rw-r--r--modules/http2/h2_io.h7
-rw-r--r--modules/http2/h2_mplx.c369
-rw-r--r--modules/http2/h2_mplx.h28
-rw-r--r--modules/http2/h2_proxy_session.c839
-rw-r--r--modules/http2/h2_proxy_session.h78
-rw-r--r--modules/http2/h2_session.c16
-rw-r--r--modules/http2/h2_task.c4
-rw-r--r--modules/http2/h2_task_input.c30
-rw-r--r--modules/http2/h2_task_input.h3
-rw-r--r--modules/http2/h2_util.c2
-rw-r--r--modules/http2/h2_worker.c51
-rw-r--r--modules/http2/h2_workers.c1
-rw-r--r--modules/http2/h2_workers.h1
-rw-r--r--modules/http2/mod_http2.c38
-rw-r--r--modules/http2/mod_http2.dsp8
-rw-r--r--modules/http2/mod_http2.h24
-rw-r--r--modules/http2/mod_proxy_http2.c140
26 files changed, 1506 insertions, 467 deletions
diff --git a/CHANGES b/CHANGES
index c20b208b72..df83e57f0c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,10 @@
-*- coding: utf-8 -*-
Changes with Apache 2.5.0
+ *) mod_proxy_http2: using single connection for several requests *if*
+ master connection uses HTTP/2 itself. Not yet hardened under load.
+ [Stefan Eissing]
+
*) core: Added support for HTTP code 451. PR58985.
[Yehuda Katz <yehuda ymkatz.net>, Jim Jagielski]
@@ -22,6 +26,7 @@ Changes with Apache 2.5.0
*) mod_proxy_http2: new experimental http2 proxy module for h2: and h2c: proxy
urls. Uses, so far, one connection per request, reuses connections.
+ [Stefan Eissing]
*) event: use pre_connection hook to properly initialize connection state for
slave connections. use protocol_switch hook to initialize server config
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3ea412f02d..d9fc914a38 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -409,7 +409,7 @@ SET(mod_http2_extra_sources
modules/http2/h2_session.c modules/http2/h2_stream.c
modules/http2/h2_stream_set.c modules/http2/h2_switch.c
modules/http2/h2_task.c modules/http2/h2_task_input.c
- modules/http2/h2_task_output.c modules/http2/h2_task_queue.c
+ modules/http2/h2_task_output.c modules/http2/h2_int_queue.c
modules/http2/h2_util.c modules/http2/h2_worker.c
modules/http2/h2_workers.c
)
diff --git a/modules/http2/NWGNUmod_http2 b/modules/http2/NWGNUmod_http2
index dd4ac10d20..0e53b4ae1f 100644
--- a/modules/http2/NWGNUmod_http2
+++ b/modules/http2/NWGNUmod_http2
@@ -194,6 +194,7 @@ FILES_nlm_objs = \
$(OBJDIR)/h2_filter.o \
$(OBJDIR)/h2_from_h1.o \
$(OBJDIR)/h2_h2.o \
+ $(OBJDIR)/h2_int_queue.o \
$(OBJDIR)/h2_io.o \
$(OBJDIR)/h2_io_set.o \
$(OBJDIR)/h2_mplx.o \
@@ -207,7 +208,6 @@ FILES_nlm_objs = \
$(OBJDIR)/h2_task.o \
$(OBJDIR)/h2_task_input.o \
$(OBJDIR)/h2_task_output.o \
- $(OBJDIR)/h2_task_queue.o \
$(OBJDIR)/h2_util.o \
$(OBJDIR)/h2_worker.o \
$(OBJDIR)/h2_workers.o \
diff --git a/modules/http2/config2.m4 b/modules/http2/config2.m4
index 0c9871552f..85a635e6b3 100644
--- a/modules/http2/config2.m4
+++ b/modules/http2/config2.m4
@@ -29,6 +29,7 @@ h2_ctx.lo dnl
h2_filter.lo dnl
h2_from_h1.lo dnl
h2_h2.lo dnl
+h2_int_queue.lo dnl
h2_io.lo dnl
h2_io_set.lo dnl
h2_mplx.lo dnl
@@ -42,7 +43,6 @@ h2_switch.lo dnl
h2_task.lo dnl
h2_task_input.lo dnl
h2_task_output.lo dnl
-h2_task_queue.lo dnl
h2_util.lo dnl
h2_worker.lo dnl
h2_workers.lo dnl
@@ -156,8 +156,10 @@ AC_DEFUN([APACHE_CHECK_NGHTTP2],[
AC_MSG_WARN([nghttp2 library is unusable])
fi
dnl # nghttp2 >= 1.3.0: access to stream weights
- AC_CHECK_FUNCS([nghttp2_stream_get_weight],
- [APR_ADDTO(MOD_CPPFLAGS, ["-DH2_NG2_STREAM_API"])], [])
+ AC_CHECK_FUNCS([nghttp2_stream_get_weight], [], [liberrors="yes"])
+ if test "x$liberrors" != "x"; then
+ AC_MSG_WARN([nghttp2 version >= 1.3.0 is required])
+ fi
dnl # nghttp2 >= 1.5.0: changing stream priorities
AC_CHECK_FUNCS([nghttp2_session_change_stream_priority],
[APR_ADDTO(MOD_CPPFLAGS, ["-DH2_NG2_CHANGE_PRIO"])], [])
@@ -206,6 +208,7 @@ APR_ADDTO(INCLUDES, [-I\$(top_srcdir)/$modpath_current])
dnl # list of module object files
proxy_http2_objs="dnl
mod_proxy_http2.lo dnl
+h2_int_queue.lo dnl
h2_proxy_session.lo dnl
h2_request.lo dnl
h2_util.lo dnl
diff --git a/modules/http2/h2_ctx.c b/modules/http2/h2_ctx.c
index b40037cb35..e8294fcb2b 100644
--- a/modules/http2/h2_ctx.c
+++ b/modules/http2/h2_ctx.c
@@ -101,7 +101,17 @@ int h2_ctx_is_task(h2_ctx *ctx)
return ctx && ctx->task;
}
-struct h2_task *h2_ctx_get_task(h2_ctx *ctx)
+h2_task *h2_ctx_get_task(h2_ctx *ctx)
{
return ctx? ctx->task : NULL;
}
+
+h2_task *h2_ctx_cget_task(conn_rec *c)
+{
+ return h2_ctx_get_task(h2_ctx_get(c, 0));
+}
+
+h2_task *h2_ctx_rget_task(request_rec *r)
+{
+ return h2_ctx_get_task(h2_ctx_rget(r));
+}
diff --git a/modules/http2/h2_ctx.h b/modules/http2/h2_ctx.h
index 68dc7c84c3..3b2c842cae 100644
--- a/modules/http2/h2_ctx.h
+++ b/modules/http2/h2_ctx.h
@@ -71,5 +71,7 @@ const char *h2_ctx_protocol_get(const conn_rec *c);
int h2_ctx_is_task(h2_ctx *ctx);
struct h2_task *h2_ctx_get_task(h2_ctx *ctx);
+struct h2_task *h2_ctx_cget_task(conn_rec *c);
+struct h2_task *h2_ctx_rget_task(request_rec *r);
#endif /* defined(__mod_h2__h2_ctx__) */
diff --git a/modules/http2/h2_int_queue.c b/modules/http2/h2_int_queue.c
new file mode 100644
index 0000000000..ba44afb71c
--- /dev/null
+++ b/modules/http2/h2_int_queue.c
@@ -0,0 +1,182 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <stddef.h>
+#include <apr_pools.h>
+
+#include "h2_int_queue.h"
+
+
+static void tq_grow(h2_int_queue *q, int nlen);
+static void tq_swap(h2_int_queue *q, int i, int j);
+static int tq_bubble_up(h2_int_queue *q, int i, int top,
+ h2_iq_cmp *cmp, void *ctx);
+static int tq_bubble_down(h2_int_queue *q, int i, int bottom,
+ h2_iq_cmp *cmp, void *ctx);
+
+h2_int_queue *h2_iq_create(apr_pool_t *pool, int capacity)
+{
+ h2_int_queue *q = apr_pcalloc(pool, sizeof(h2_int_queue));
+ if (q) {
+ q->pool = pool;
+ tq_grow(q, capacity);
+ q->nelts = 0;
+ }
+ return q;
+}
+
+int h2_iq_empty(h2_int_queue *q)
+{
+ return q->nelts == 0;
+}
+
+int h2_iq_size(h2_int_queue *q)
+{
+ return q->nelts;
+}
+
+
+void h2_iq_add(h2_int_queue *q, int sid, h2_iq_cmp *cmp, void *ctx)
+{
+ int i;
+
+ if (q->nelts >= q->nalloc) {
+ tq_grow(q, q->nalloc * 2);
+ }
+
+ i = (q->head + q->nelts) % q->nalloc;
+ q->elts[i] = sid;
+ ++q->nelts;
+
+ if (cmp) {
+ /* bubble it to the front of the queue */
+ tq_bubble_up(q, i, q->head, cmp, ctx);
+ }
+}
+
+int h2_iq_remove(h2_int_queue *q, int sid)
+{
+ int i;
+ for (i = 0; i < q->nelts; ++i) {
+ if (sid == q->elts[(q->head + i) % q->nalloc]) {
+ break;
+ }
+ }
+
+ if (i < q->nelts) {
+ ++i;
+ for (; i < q->nelts; ++i) {
+ q->elts[(q->head+i-1)%q->nalloc] = q->elts[(q->head+i)%q->nalloc];
+ }
+ --q->nelts;
+ return 1;
+ }
+ return 0;
+}
+
+void h2_iq_sort(h2_int_queue *q, h2_iq_cmp *cmp, void *ctx)
+{
+ /* Assume that changes in ordering are minimal. This needs,
+ * best case, q->nelts - 1 comparisions to check that nothing
+ * changed.
+ */
+ if (q->nelts > 0) {
+ int i, ni, prev, last;
+
+ /* Start at the end of the queue and create a tail of sorted
+ * entries. Make that tail one element longer in each iteration.
+ */
+ last = i = (q->head + q->nelts - 1) % q->nalloc;
+ while (i != q->head) {
+ prev = (q->nalloc + i - 1) % q->nalloc;
+
+ ni = tq_bubble_up(q, i, prev, cmp, ctx);
+ if (ni == prev) {
+ /* i bubbled one up, bubble the new i down, which
+ * keeps all tasks below i sorted. */
+ tq_bubble_down(q, i, last, cmp, ctx);
+ }
+ i = prev;
+ };
+ }
+}
+
+
+int h2_iq_shift(h2_int_queue *q)
+{
+ int sid;
+
+ if (q->nelts <= 0) {
+ return 0;
+ }
+
+ sid = q->elts[q->head];
+ q->head = (q->head + 1) % q->nalloc;
+ q->nelts--;
+
+ return sid;
+}
+
+static void tq_grow(h2_int_queue *q, int nlen)
+{
+ if (nlen > q->nalloc) {
+ int *nq = apr_pcalloc(q->pool, sizeof(int) * nlen);
+ if (q->nelts > 0) {
+ int l = ((q->head + q->nelts) % q->nalloc) - q->head;
+
+ memmove(nq, q->elts + q->head, sizeof(int) * l);
+ if (l < q->nelts) {
+ /* elts wrapped, append elts in [0, remain] to nq */
+ int remain = q->nelts - l;
+ memmove(nq + l, q->elts, sizeof(int) * remain);
+ }
+ }
+ q->elts = nq;
+ q->nalloc = nlen;
+ q->head = 0;
+ }
+}
+
+static void tq_swap(h2_int_queue *q, int i, int j)
+{
+ int x = q->elts[i];
+ q->elts[i] = q->elts[j];
+ q->elts[j] = x;
+}
+
+static int tq_bubble_up(h2_int_queue *q, int i, int top,
+ h2_iq_cmp *cmp, void *ctx)
+{
+ int prev;
+ while (((prev = (q->nalloc + i - 1) % q->nalloc), i != top)
+ && (*cmp)(q->elts[i], q->elts[prev], ctx) < 0) {
+ tq_swap(q, prev, i);
+ i = prev;
+ }
+ return i;
+}
+
+static int tq_bubble_down(h2_int_queue *q, int i, int bottom,
+ h2_iq_cmp *cmp, void *ctx)
+{
+ int next;
+ while (((next = (q->nalloc + i + 1) % q->nalloc), i != bottom)
+ && (*cmp)(q->elts[i], q->elts[next], ctx) > 0) {
+ tq_swap(q, next, i);
+ i = next;
+ }
+ return i;
+}
diff --git a/modules/http2/h2_int_queue.h b/modules/http2/h2_int_queue.h
new file mode 100644
index 0000000000..6cdd84c42b
--- /dev/null
+++ b/modules/http2/h2_int_queue.h
@@ -0,0 +1,103 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __mod_h2__h2_int_queue__
+#define __mod_h2__h2_int_queue__
+
+/**
+ * h2_int_queue keeps a list of sorted h2_task* in ascending order.
+ */
+typedef struct h2_int_queue h2_int_queue;
+
+struct h2_int_queue {
+ int *elts;
+ int head;
+ int nelts;
+ int nalloc;
+ apr_pool_t *pool;
+};
+
+/**
+ * Comparator for two task to determine their order.
+ *
+ * @param s1 stream id to compare
+ * @param s2 stream id to compare
+ * @param ctx provided user data
+ * @return value is the same as for strcmp() and has the effect:
+ * == 0: s1 and s2 are treated equal in ordering
+ * < 0: s1 should be sorted before s2
+ * > 0: s2 should be sorted before s1
+ */
+typedef int h2_iq_cmp(int s1, int s2, void *ctx);
+
+
+/**
+ * Allocate a new queue from the pool and initialize.
+ * @param id the identifier of the queue
+ * @param pool the memory pool
+ */
+h2_int_queue *h2_iq_create(apr_pool_t *pool, int capacity);
+
+/**
+ * Return != 0 iff there are no tasks in the queue.
+ * @param q the queue to check
+ */
+int h2_iq_empty(h2_int_queue *q);
+
+/**
+ * Return the number of int in the queue.
+ * @param q the queue to get size on
+ */
+int h2_iq_size(h2_int_queue *q);
+
+/**
+ * Add a stream idto the queue.
+ *
+ * @param q the queue to append the task to
+ * @param sid the stream id to add
+ * @param cmp the comparator for sorting
+ * @param ctx user data for comparator
+ */
+void h2_iq_add(h2_int_queue *q, int sid, h2_iq_cmp *cmp, void *ctx);
+
+/**
+ * Remove the stream id from the queue. Return != 0 iff task
+ * was found in queue.
+ * @param q the task queue
+ * @param sid the stream id to remove
+ * @return != 0 iff task was found in queue
+ */
+int h2_iq_remove(h2_int_queue *q, int sid);
+
+/**
+ * Sort the stream idqueue again. Call if the task ordering
+ * has changed.
+ *
+ * @param q the queue to sort
+ * @param cmp the comparator for sorting
+ * @param ctx user data for the comparator
+ */
+void h2_iq_sort(h2_int_queue *q, h2_iq_cmp *cmp, void *ctx);
+
+/**
+ * Get the first stream id from the queue or NULL if the queue is empty.
+ * The task will be removed.
+ *
+ * @param q the queue to get the first task from
+ * @return the first stream id of the queue, 0 if empty
+ */
+int h2_iq_shift(h2_int_queue *q);
+
+#endif /* defined(__mod_h2__h2_int_queue__) */
diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c
index 07953d1899..6107c01ea5 100644
--- a/modules/http2/h2_io.c
+++ b/modules/http2/h2_io.c
@@ -75,6 +75,11 @@ int h2_io_in_has_eos_for(h2_io *io)
return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, -1));
}
+int h2_io_in_has_data(h2_io *io)
+{
+ return io->bbin && h2_util_bb_has_data_or_eos(io->bbin);
+}
+
int h2_io_out_has_data(h2_io *io)
{
return io->bbout && h2_util_bb_has_data_or_eos(io->bbout);
@@ -256,6 +261,18 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
}
}
+ if (status == APR_SUCCESS && (!io->bbin || APR_BRIGADE_EMPTY(io->bbin))) {
+ if (io->eos_in) {
+ if (!io->eos_in_written) {
+ status = append_eos(io, bb, trailers);
+ io->eos_in_written = 1;
+ }
+ }
+ }
+
+ if (status == APR_SUCCESS && APR_BRIGADE_EMPTY(bb)) {
+ return APR_EAGAIN;
+ }
return status;
}
diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h
index f0b085b0d2..9c1584a376 100644
--- a/modules/http2/h2_io.h
+++ b/modules/http2/h2_io.h
@@ -63,9 +63,6 @@ struct h2_io {
apr_size_t input_consumed; /* how many bytes have been read */
int files_handles_owned;
-
- struct h2_task *task; /* parked task */
- request_rec *r; /* parked request */
};
/*******************************************************************************
@@ -101,6 +98,10 @@ int h2_io_in_has_eos_for(h2_io *io);
* Output data is available.
*/
int h2_io_out_has_data(h2_io *io);
+/**
+ * Input data is available.
+ */
+int h2_io_in_has_data(h2_io *io);
void h2_io_signal(h2_io *io, h2_io_op op);
void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout,
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c
index 80b36c05ca..f5c4bb7acd 100644
--- a/modules/http2/h2_mplx.c
+++ b/modules/http2/h2_mplx.c
@@ -34,6 +34,7 @@
#include "h2_conn.h"
#include "h2_ctx.h"
#include "h2_h2.h"
+#include "h2_int_queue.h"
#include "h2_io.h"
#include "h2_io_set.h"
#include "h2_response.h"
@@ -44,7 +45,6 @@
#include "h2_task.h"
#include "h2_task_input.h"
#include "h2_task_output.h"
-#include "h2_task_queue.h"
#include "h2_worker.h"
#include "h2_workers.h"
#include "h2_util.h"
@@ -206,7 +206,13 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
return NULL;
}
- m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
+ status = apr_thread_cond_create(&m->request_done, m->pool);
+ if (status != APR_SUCCESS) {
+ h2_mplx_destroy(m);
+ return NULL;
+ }
+
+ m->q = h2_iq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
m->stream_ios = h2_io_set_create(m->pool);
m->ready_ios = h2_io_set_create(m->pool);
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
@@ -296,7 +302,7 @@ static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error)
h2_io_set_remove(m->ready_ios, io);
if (!io->processing_started || io->processing_done) {
/* already finished or not even started yet */
- h2_tq_remove(m->q, io->id);
+ h2_iq_remove(m->q, io->id);
io_destroy(m, io, 1);
return 0;
}
@@ -324,6 +330,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
/* disable WINDOW_UPDATE callbacks */
h2_mplx_set_consumed_cb(m, NULL, NULL);
+ apr_thread_cond_broadcast(m->request_done);
while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
/* iterate until all ios have been orphaned or destroyed */
}
@@ -351,6 +358,8 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
"all h2_workers to return, have still %d requests outstanding",
m->id, i*wait_secs, (int)h2_io_set_size(m->stream_ios));
}
+ m->aborted = 1;
+ apr_thread_cond_broadcast(m->request_done);
}
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
@@ -408,7 +417,7 @@ static const h2_request *pop_request(h2_mplx *m)
{
const h2_request *req = NULL;
int sid;
- while (!m->aborted && !req && (sid = h2_tq_shift(m->q)) > 0) {
+ while (!m->aborted && !req && (sid = h2_iq_shift(m->q)) > 0) {
h2_io *io = h2_io_set_get(m->stream_ios, sid);
if (io) {
req = io->request;
@@ -421,17 +430,8 @@ static const h2_request *pop_request(h2_mplx *m)
return req;
}
-static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, request_rec *r)
-{
- if (!m->engine_queue) {
- apr_queue_create(&m->engine_queue, 200, m->pool);
- }
- return apr_queue_trypush(m->engine_queue, r);
-}
-
-void h2_mplx_request_done(h2_mplx **pm, int stream_id, const h2_request **preq)
+void h2_mplx_request_done(h2_mplx *m, int stream_id, const h2_request **preq)
{
- h2_mplx *m = *pm;
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
@@ -440,49 +440,30 @@ void h2_mplx_request_done(h2_mplx **pm, int stream_id, const h2_request **preq)
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld): request(%d) done", m->id, stream_id);
if (io) {
- request_rec *r = io->r;
-
+ io->processing_done = 1;
+ h2_mplx_out_close(m, stream_id, NULL);
if (io->orphaned) {
- io->processing_done = 1;
- }
- else if (r) {
- /* A parked request which is being transferred from
- * one worker thread to another. This request_done call
- * was from the initial thread and now it is safe to
- * schedule it for further processing. */
- h2_task_thaw(io->task);
- io->task = NULL;
- io->r = NULL;
- h2_mplx_engine_schedule(*pm, r);
+ io_destroy(m, io, 0);
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
+ }
}
else {
- io->processing_done = 1;
- }
-
- if (io->processing_done) {
- h2_io_out_close(io, NULL);
- if (io->orphaned) {
- io_destroy(m, io, 0);
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
- }
- else {
- /* hang around until the stream deregisteres */
- }
+ /* hang around until the stream deregisteres */
}
}
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ "h2_mplx(%ld): request(%d) done, no io found",
+ m->id, stream_id);
+ }
+ apr_thread_cond_broadcast(m->request_done);
}
if (preq) {
/* someone wants another request, if we have */
*preq = pop_request(m);
}
- if (!preq || !*preq) {
- /* No request to hand back to the worker, NULLify reference
- * and decrement count */
- *pm = NULL;
- }
leave_mutex(m, acquired);
}
}
@@ -935,6 +916,26 @@ int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
return has_eos;
}
+int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id)
+{
+ apr_status_t status;
+ int has_data = 0;
+ int acquired;
+
+ AP_DEBUG_ASSERT(m);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ if (io && !io->orphaned) {
+ has_data = h2_io_in_has_data(io);
+ }
+ else {
+ has_data = 0;
+ }
+ leave_mutex(m, acquired);
+ }
+ return has_data;
+}
+
int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
{
apr_status_t status;
@@ -1001,7 +1002,7 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
status = APR_ECONNABORTED;
}
else {
- h2_tq_sort(m->q, cmp, ctx);
+ h2_iq_sort(m->q, cmp, ctx);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): reprioritize tasks", m->id);
@@ -1050,8 +1051,8 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const h2_request *req,
status = h2_io_in_close(io);
}
- was_empty = h2_tq_empty(m->q);
- h2_tq_add(m->q, io->id, cmp, ctx);
+ was_empty = h2_iq_empty(m->q);
+ h2_iq_add(m->q, io->id, cmp, ctx);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
"h2_mplx(%ld-%d): process", m->c->id, stream_id);
@@ -1079,61 +1080,116 @@ const h2_request *h2_mplx_pop_request(h2_mplx *m, int *has_more)
}
else {
req = pop_request(m);
- *has_more = !h2_tq_empty(m->q);
+ *has_more = !h2_iq_empty(m->q);
}
leave_mutex(m, acquired);
}
return req;
}
-apr_status_t h2_mplx_engine_push(h2_mplx *m, h2_task *task,
- const char *engine_type,
+
+/*******************************************************************************
+ * HTTP/2 request engines
+ ******************************************************************************/
+
+typedef struct h2_req_engine_i h2_req_engine_i;
+struct h2_req_engine_i {
+ h2_req_engine pub;
+ conn_rec *c; /* connection this engine is assigned to */
+ h2_mplx *m;
+ unsigned int shutdown : 1; /* engine is being shut down */
+ apr_thread_cond_t *io; /* condition var for waiting on data */
+ apr_queue_t *queue; /* queue of scheduled request_rec* */
+ apr_size_t no_assigned; /* # of assigned requests */
+ apr_size_t no_live; /* # of live */
+ apr_size_t no_finished; /* # of finished */
+};
+
+static apr_status_t h2_mplx_engine_schedule(h2_mplx *m,
+ h2_req_engine_i *engine,
+ request_rec *r)
+{
+ if (!engine->queue) {
+ apr_queue_create(&engine->queue, 100, engine->pub.pool);
+ }
+ return apr_queue_trypush(engine->queue, r);
+}
+
+
+apr_status_t h2_mplx_engine_push(const char *engine_type,
request_rec *r, h2_mplx_engine_init *einit)
{
apr_status_t status;
+ h2_mplx *m;
+ h2_task *task;
int acquired;
+ task = h2_ctx_rget_task(r);
+ if (!task) {
+ return APR_ECONNABORTED;
+ }
+ m = task->mplx;
AP_DEBUG_ASSERT(m);
+
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
if (!io || io->orphaned) {
status = APR_ECONNABORTED;
}
else {
- h2_req_engine *engine;
+ h2_req_engine_i *engine = (h2_req_engine_i*)m->engine;
apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id);
status = APR_EOF;
- engine = m->engine; /* just a single one for now */
+
if (task->ser_headers) {
/* Max compatibility, deny processing of this */
}
- else if (!engine && einit) {
- engine = apr_pcalloc(r->connection->pool, sizeof(*engine));
- engine->id = 1;
- engine->c = r->connection;
- engine->pool = r->connection->pool;
- engine->type = apr_pstrdup(engine->pool, engine_type);
-
- status = einit(engine, r);
- if (status == APR_SUCCESS) {
- m->engine = engine;
+ else if (engine && !strcmp(engine->pub.type, engine_type)) {
+ if (engine->shutdown
+ || engine->no_assigned >= H2MIN(engine->pub.capacity, 100)) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
- "h2_mplx(%ld): init engine %d (%s)",
- m->c->id, engine->id, engine->type);
+ "h2_mplx(%ld): engine shutdown or over %s",
+ m->c->id, engine->pub.id);
+ engine = NULL;
}
- }
- else if (engine && !strcmp(engine->type, engine_type)) {
- if (status == APR_SUCCESS) {
+ else if (h2_mplx_engine_schedule(m, engine, r) == APR_SUCCESS) {
/* this task will be processed in another thread,
* freeze any I/O for the time being. */
h2_task_freeze(task, r);
- io->task = task;
- io->r = r;
+ engine->no_assigned++;
+ status = APR_SUCCESS;
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r,
+ "h2_mplx(%ld): push request %s",
+ m->c->id, r->the_request);
+ }
+ else {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+ "h2_mplx(%ld): engine error adding req %s",
+ m->c->id, engine->pub.id);
+ engine = NULL;
+ }
+ }
+
+ if (!engine && einit) {
+ engine = apr_pcalloc(task->pool, sizeof(*engine));
+ engine->pub.id = apr_psprintf(task->pool, "eng-%ld-%d",
+ m->id, m->next_eng_id++);
+ engine->pub.pool = task->pool;
+ engine->pub.type = apr_pstrdup(task->pool, engine_type);
+ engine->c = r->connection;
+ engine->m = m;
+ engine->io = task->io;
+ engine->no_assigned = 1;
+ engine->no_live = 1;
+
+ status = einit(&engine->pub, r);
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+ "h2_mplx(%ld): init engine %s (%s)",
+ m->c->id, engine->pub.id, engine->pub.type);
+ if (status == APR_SUCCESS) {
+ m->engine = &engine->pub;
}
- ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, r,
- "h2_mplx(%ld): push request %s",
- m->c->id, r->the_request);
}
}
@@ -1141,52 +1197,163 @@ apr_status_t h2_mplx_engine_push(h2_mplx *m, h2_task *task,
}
return status;
}
+
+static request_rec *get_non_frozen(apr_queue_t *equeue)
+{
+ request_rec *r, *first = NULL;
+ h2_task *task;
+ void *elem;
+
+ if (equeue) {
+ /* FIFO queue, try to find a request_rec whose task is not frozen */
+ while (apr_queue_trypop(equeue, &elem) == APR_SUCCESS) {
+ r = elem;
+ task = h2_ctx_rget_task(r);
+ AP_DEBUG_ASSERT(task);
+ if (!task->frozen) {
+ return r;
+ }
+ apr_queue_push(equeue, r);
+ if (!first) {
+ first = r;
+ }
+ else if (r == first) {
+ return NULL; /* walked the whole queue */
+ }
+ }
+ }
+ return NULL;
+}
+
+static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine,
+ apr_read_type_e block, request_rec **pr)
+{
+ request_rec *r;
+
+ AP_DEBUG_ASSERT(m);
+ AP_DEBUG_ASSERT(engine);
+ while (1) {
+ if (m->aborted) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): mplx abort while pulling requests %s",
+ m->id, engine->pub.id);
+ *pr = NULL;
+ return APR_EOF;
+ }
+
+ if (engine->queue && (r = get_non_frozen(engine->queue))) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+ "h2_mplx(%ld): request %s pulled by engine %s",
+ m->c->id, r->the_request, engine->pub.id);
+ engine->no_live++;
+ *pr = r;
+ return APR_SUCCESS;
+ }
+ else if (APR_NONBLOCK_READ == block) {
+ *pr = NULL;
+ return APR_EAGAIN;
+ }
+ else if (!engine->queue || !apr_queue_size(engine->queue)) {
+ engine->shutdown = 1;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): emtpy queue, shutdown engine %s",
+ m->id, engine->pub.id);
+ *pr = NULL;
+ return APR_EOF;
+ }
+ apr_thread_cond_timedwait(m->request_done, m->lock,
+ apr_time_from_msec(100));
+ }
+}
-apr_status_t h2_mplx_engine_pull(h2_mplx *m, h2_task *task,
- struct h2_req_engine *engine,
- apr_time_t timeout, request_rec **pr)
+apr_status_t h2_mplx_engine_pull(h2_req_engine *pub_engine,
+ apr_read_type_e block, request_rec **pr)
{
+ h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
+ h2_mplx *m = engine->m;
apr_status_t status;
int acquired;
- AP_DEBUG_ASSERT(m);
+ *pr = NULL;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- status = APR_ECONNABORTED;
- if (m->engine == engine && m->engine_queue) {
- void *elem;
- status = apr_queue_trypop(m->engine_queue, &elem);
- if (status == APR_SUCCESS) {
- *pr = elem;
- ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, *pr,
- "h2_mplx(%ld): request %s pulled by engine %d",
- m->c->id, (*pr)->the_request, engine->id);
- }
- }
+ status = engine_pull(m, engine, block, pr);
leave_mutex(m, acquired);
}
return status;
}
-void h2_mplx_engine_done(h2_mplx *m, struct h2_task *task, conn_rec *r_conn)
+static void engine_done(h2_mplx *m, h2_req_engine_i *engine, h2_task *task,
+ int waslive, int aborted)
{
- int stream_id = task->stream_id;
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+ "h2_mplx(%ld): task %s %s by %s",
+ m->id, task->id, aborted? "aborted":"done",
+ engine->pub.id);
h2_task_output_close(task->output);
- h2_mplx_request_done(&m, stream_id, NULL);
- apr_pool_destroy(r_conn->pool);
+ h2_mplx_request_done(m, task->stream_id, NULL);
+ apr_pool_destroy(task->pool);
+ engine->no_finished++;
+ if (waslive) engine->no_live--;
+ engine->no_assigned--;
+}
+
+void h2_mplx_engine_done(h2_req_engine *pub_engine, conn_rec *r_conn)
+{
+ h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
+ h2_mplx *m = engine->m;
+ h2_task *task;
+ int acquired;
+
+ task = h2_ctx_cget_task(r_conn);
+ if (task && (enter_mutex(m, &acquired) == APR_SUCCESS)) {
+ engine_done(m, engine, task, 1, 0);
+ leave_mutex(m, acquired);
+ }
}
-void h2_mplx_engine_exit(h2_mplx *m, struct h2_task *task,
- struct h2_req_engine *engine)
+void h2_mplx_engine_exit(h2_req_engine *pub_engine)
{
+ h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
+ h2_mplx *m = engine->m;
int acquired;
- AP_DEBUG_ASSERT(m);
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- /* TODO: shutdown of engine->c */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): exit engine %d (%s)",
- m->c->id, engine->id, engine->type);
- m->engine = NULL;
+ if (engine->queue && apr_queue_size(engine->queue)) {
+ void *entry;
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ "h2_mplx(%ld): exit engine %s (%s), "
+ "has still %d requests queued, "
+ "assigned=%ld, live=%ld, finished=%ld",
+ m->c->id, engine->pub.id, engine->pub.type,
+ (int)apr_queue_size(engine->queue),
+ (long)engine->no_assigned, (long)engine->no_live,
+ (long)engine->no_finished);
+ while (apr_queue_trypop(engine->queue, &entry) == APR_SUCCESS) {
+ request_rec *r = entry;
+ h2_task *task = h2_ctx_rget_task(r);
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ "h2_mplx(%ld): engine %s has queued task %s, "
+ "frozen=%d, aborting",
+ m->c->id, engine->pub.id, task->id, task->frozen);
+ engine_done(m, engine, task, 0, 1);
+ }
+ }
+ if (engine->no_assigned > 1 || engine->no_live > 1) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ "h2_mplx(%ld): exit engine %s (%s), "
+ "assigned=%ld, live=%ld, finished=%ld",
+ m->c->id, engine->pub.id, engine->pub.type,
+ (long)engine->no_assigned, (long)engine->no_live,
+ (long)engine->no_finished);
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): exit engine %s (%s)",
+ m->c->id, engine->pub.id, engine->pub.type);
+ }
+ if (m->engine == &engine->pub) {
+ m->engine = NULL; /* TODO */
+ }
leave_mutex(m, acquired);
}
}
diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h
index 837025f996..724cfe1c2e 100644
--- a/modules/http2/h2_mplx.h
+++ b/modules/http2/h2_mplx.h
@@ -46,7 +46,7 @@ struct h2_io_set;
struct apr_thread_cond_t;
struct h2_workers;
struct h2_stream_set;
-struct h2_task_queue;
+struct h2_int_queue;
struct h2_req_engine;
#include <apr_queue.h>
@@ -69,7 +69,7 @@ struct h2_mplx {
unsigned int aborted : 1;
- struct h2_task_queue *q;
+ struct h2_int_queue *q;
struct h2_io_set *stream_ios;
struct h2_io_set *ready_ios;
@@ -77,6 +77,7 @@ struct h2_mplx {
apr_thread_mutex_t *lock;
struct apr_thread_cond_t *added_output;
+ struct apr_thread_cond_t *request_done;
struct apr_thread_cond_t *join_wait;
apr_size_t stream_max_mem;
@@ -91,7 +92,9 @@ struct h2_mplx {
void *input_consumed_ctx;
struct h2_req_engine *engine;
+ /* TODO: signal for waiting tasks*/
apr_queue_t *engine_queue;
+ int next_eng_id;
};
@@ -127,7 +130,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, struct apr_thread_cond_t *wait
*/
void h2_mplx_abort(h2_mplx *mplx);
-void h2_mplx_request_done(h2_mplx **pm, int stream_id, const struct h2_request **preq);
+void h2_mplx_request_done(h2_mplx *m, int stream_id, const struct h2_request **preq);
/**
* Get the highest stream identifier that has been passed on to processing.
@@ -151,10 +154,14 @@ int h2_mplx_get_max_stream_started(h2_mplx *m);
*/
apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
-/* Return != 0 iff the multiplexer has data for the given stream.
+/* Return != 0 iff the multiplexer has output data for the given stream.
*/
int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
+/* Return != 0 iff the multiplexer has input data for the given stream.
+ */
+int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id);
+
/**
* Waits on output data from any stream in this session to become available.
* Returns APR_TIMEUP if no data arrived in the given time.
@@ -385,17 +392,14 @@ APR_RING_INSERT_TAIL((b), ap__b, h2_mplx, link); \
typedef apr_status_t h2_mplx_engine_init(struct h2_req_engine *engine,
request_rec *r);
-apr_status_t h2_mplx_engine_push(h2_mplx *m, struct h2_task *task,
- const char *engine_type,
+apr_status_t h2_mplx_engine_push(const char *engine_type,
request_rec *r, h2_mplx_engine_init *einit);
-apr_status_t h2_mplx_engine_pull(h2_mplx *m, struct h2_task *task,
- struct h2_req_engine *engine,
- apr_time_t timeout, request_rec **pr);
+apr_status_t h2_mplx_engine_pull(struct h2_req_engine *engine,
+ apr_read_type_e block, request_rec **pr);
-void h2_mplx_engine_done(h2_mplx *m, struct h2_task *task, conn_rec *r_conn);
+void h2_mplx_engine_done(struct h2_req_engine *engine, conn_rec *r_conn);
-void h2_mplx_engine_exit(h2_mplx *m, struct h2_task *task,
- struct h2_req_engine *engine);
+void h2_mplx_engine_exit(struct h2_req_engine *engine);
#endif /* defined(__mod_h2__h2_mplx__) */
diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c
index 8f7b5d0a98..a55d7ec739 100644
--- a/modules/http2/h2_proxy_session.c
+++ b/modules/http2/h2_proxy_session.c
@@ -21,12 +21,33 @@
#include <mod_http2.h>
#include "h2.h"
+#include "h2_int_queue.h"
#include "h2_request.h"
#include "h2_util.h"
#include "h2_proxy_session.h"
APLOG_USE_MODULE(proxy_http2);
+typedef struct h2_proxy_stream {
+ int id;
+ apr_pool_t *pool;
+ h2_proxy_session *session;
+
+ const char *url;
+ request_rec *r;
+ h2_request *req;
+
+ h2_stream_state_t state;
+ unsigned int suspended : 1;
+ unsigned int data_received : 1;
+
+ apr_bucket_brigade *input;
+ apr_bucket_brigade *output;
+
+ apr_table_t *saves;
+} h2_proxy_stream;
+
+
static int ngstatus_from_apr_status(apr_status_t rv)
{
if (rv == APR_SUCCESS) {
@@ -41,19 +62,21 @@ static int ngstatus_from_apr_status(apr_status_t rv)
return NGHTTP2_ERR_PROTO;
}
+static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
+ int arg, const char *msg);
-static apr_status_t proxy_session_shutdown(void *theconn)
+
+static apr_status_t proxy_session_pre_close(void *theconn)
{
proxy_conn_rec *p_conn = (proxy_conn_rec *)theconn;
h2_proxy_session *session = p_conn->data;
if (session && session->ngh2) {
- if (session->c && !session->c->aborted && !session->goaway_sent) {
- nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE,
- session->max_stream_recv, 0, NULL, 0);
- nghttp2_session_send(session->ngh2);
- }
-
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "proxy_session(%s): shutdown, state=%d, streams=%d",
+ session->id, session->state,
+ h2_iq_size(session->streams));
+ dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL);
nghttp2_session_del(session->ngh2);
session->ngh2 = NULL;
p_conn->data = NULL;
@@ -109,8 +132,8 @@ static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
int flush = 1;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
- "h2_proxy_sesssion(%ld): raw_send %d bytes, flush=%d",
- session->c->id, (int)length, flush);
+ "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d",
+ session->id, (int)length, flush);
b = apr_bucket_transient_create((const char*)data, length,
session->c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(session->output, b);
@@ -120,7 +143,7 @@ static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
session->output, flush);
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
- "h2_proxy_sesssion(%ld): sending", session->c->id);
+ "h2_proxy_sesssion(%s): sending", session->id);
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
return length;
@@ -138,8 +161,8 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
- "h2_session(%ld): recv FRAME[%s]",
- session->c->id, buffer);
+ "h2_session(%s): recv FRAME[%s]",
+ session->id, buffer);
}
switch (frame->hd.type) {
@@ -150,9 +173,22 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
break;
case NGHTTP2_PUSH_PROMISE:
break;
+ case NGHTTP2_SETTINGS:
+ if (frame->settings.niv > 0) {
+ session->remote_max_concurrent = nghttp2_session_get_remote_settings(ngh2, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS);
+ }
+ break;
case NGHTTP2_GOAWAY:
- session->goaway_recvd = 1;
+ dispatch_event(session, H2_PROXYS_EV_REMOTE_GOAWAY, 0, NULL);
/* TODO: close handling */
+ if (APLOGcinfo(session->c)) {
+ char buffer[256];
+
+ h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, APLOGNO()
+ "h2_session(%s): recv FRAME[%s]",
+ session->id, buffer);
+ }
break;
default:
break;
@@ -164,13 +200,27 @@ static int before_frame_send(nghttp2_session *ngh2,
const nghttp2_frame *frame, void *user_data)
{
h2_proxy_session *session = user_data;
- if (APLOGcdebug(session->c)) {
- char buffer[256];
-
- h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03068)
- "h2_session(%ld): sent FRAME[%s]",
- session->c->id, buffer);
+ switch (frame->hd.type) {
+ case NGHTTP2_GOAWAY:
+ if (APLOGcinfo(session->c)) {
+ char buffer[256];
+
+ h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, APLOGNO()
+ "h2_session(%s): sent FRAME[%s]",
+ session->id, buffer);
+ }
+ break;
+ default:
+ if (APLOGcdebug(session->c)) {
+ char buffer[256];
+
+ h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
+ "h2_session(%s): sent FRAME[%s]",
+ session->id, buffer);
+ }
+ break;
}
return 0;
}
@@ -217,8 +267,8 @@ static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream,
char *s = apr_pstrndup(stream->pool, v, vlen);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
- "h2_proxy_stream(%ld-%d): got status %s",
- stream->session->c->id, stream->id, s);
+ "h2_proxy_stream(%s-%d): got status %s",
+ stream->session->id, stream->id, s);
stream->r->status = (int)apr_atoi64(s);
if (stream->r->status <= 0) {
stream->r->status = 500;
@@ -236,8 +286,8 @@ static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream,
hvalue = apr_pstrndup(stream->pool, v, vlen);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
- "h2_proxy_stream(%ld-%d): got header %s: %s",
- stream->session->c->id, stream->id, hname, hvalue);
+ "h2_proxy_stream(%s-%d): got header %s: %s",
+ stream->session->id, stream->id, hname, hvalue);
process_proxy_header(stream->r, hname, hvalue);
}
return APR_SUCCESS;
@@ -248,8 +298,8 @@ static int log_header(void *ctx, const char *key, const char *value)
h2_proxy_stream *stream = ctx;
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
- "h2_proxy_stream(%ld-%d), header_out %s: %s",
- stream->session->c->id, stream->id, key, value);
+ "h2_proxy_stream(%s-%d), header_out %s: %s",
+ stream->session->id, stream->id, key, value);
return 1;
}
@@ -307,8 +357,8 @@ static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream)
if (APLOGrtrace2(stream->r)) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
- "h2_proxy_stream(%ld-%d), header_out after merging",
- stream->session->c->id, stream->id);
+ "h2_proxy_stream(%s-%d), header_out after merging",
+ stream->session->id, stream->id);
apr_table_do(log_header, stream, stream->r->headers_out, NULL);
}
}
@@ -338,11 +388,15 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
b = apr_bucket_transient_create((const char*)data, len,
stream->r->connection->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(stream->output, b);
+ if (flags & NGHTTP2_DATA_FLAG_EOF) {
+ b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(stream->output, b);
+ }
status = ap_pass_brigade(stream->r->output_filters, stream->output);
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO()
- "h2_session(%ld-%d): passing output",
- session->c->id, stream->id);
+ "h2_session(%s-%d): passing output",
+ session->id, stream->id);
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
return 0;
@@ -352,23 +406,7 @@ static int on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
uint32_t error_code, void *user_data)
{
h2_proxy_session *session = user_data;
- h2_proxy_stream *stream;
-
- stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
- if (!stream) {
- return 0;
- }
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
- "h2_proxy_sesssion(%ld): closing stream(%d)",
- session->c->id, stream_id);
-
- if (!stream->data_received) {
- /* last chance to manipulate response headers.
- * after this, only trailers */
- stream->data_received = 1;
- }
- stream->state = H2_STREAM_ST_CLOSED;
+ dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
return 0;
}
@@ -413,11 +451,11 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id,
if (APR_BRIGADE_EMPTY(stream->input)) {
status = ap_get_brigade(stream->r->input_filters, stream->input,
- AP_MODE_READBYTES, APR_BLOCK_READ,
- H2MIN(APR_BUCKET_BUFF_SIZE, length));
+ AP_MODE_READBYTES, APR_NONBLOCK_READ,
+ H2MAX(APR_BUCKET_BUFF_SIZE, length));
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
- "h2_proxy_stream(%d): request body read",
- stream->id);
+ "h2_proxy_stream(%s-%d): request body read",
+ stream->session->id, stream->id);
}
if (status == APR_SUCCESS) {
@@ -459,31 +497,42 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id,
return readlen;
}
else if (APR_STATUS_IS_EAGAIN(status)) {
+ /* suspended stream, needs to be re-awakened */
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
+ "h2_proxy_stream(%s-%d): suspending",
+ stream->session->id, stream_id);
+ stream->suspended = 1;
+ h2_iq_add(stream->session->suspended, stream->id, NULL, NULL);
return NGHTTP2_ERR_DEFERRED;
}
return ngstatus_from_apr_status(status);
}
-h2_proxy_session *h2_proxy_session_setup(request_rec *r, proxy_conn_rec *p_conn,
- proxy_server_conf *conf)
+h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
+ proxy_server_conf *conf,
+ h2_proxy_request_done *done)
{
if (!p_conn->data) {
+ apr_pool_t *pool = p_conn->scpool;
h2_proxy_session *session;
- nghttp2_settings_entry settings[2];
nghttp2_session_callbacks *cbs;
- int add_conn_window;
- int rv;
+ nghttp2_option *option;
- session = apr_pcalloc(p_conn->scpool, sizeof(*session));
- apr_pool_pre_cleanup_register(p_conn->scpool, p_conn, proxy_session_shutdown);
+ session = apr_pcalloc(pool, sizeof(*session));
+ apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close);
p_conn->data = session;
+ session->id = apr_pstrdup(p_conn->scpool, id);
session->c = p_conn->connection;
session->p_conn = p_conn;
session->conf = conf;
session->pool = p_conn->scpool;
+ session->state = H2_PROXYS_ST_INIT;
session->window_bits_default = 30;
session->window_bits_connection = 30;
+ session->streams = h2_iq_create(pool, 25);
+ session->suspended = h2_iq_create(pool, 5);
+ session->done = done;
session->input = apr_brigade_create(session->pool, session->c->bucket_alloc);
session->output = apr_brigade_create(session->pool, session->c->bucket_alloc);
@@ -496,70 +545,41 @@ h2_proxy_session *h2_proxy_session_setup(request_rec *r, proxy_conn_rec *p_conn,
nghttp2_session_callbacks_set_before_frame_send_callback(cbs, before_frame_send);
nghttp2_session_callbacks_set_send_callback(cbs, raw_send);
- nghttp2_session_client_new(&session->ngh2, cbs, session);
+ nghttp2_option_new(&option);
+ nghttp2_option_set_peer_max_concurrent_streams(option, 20);
+
+ nghttp2_session_client_new2(&session->ngh2, cbs, session, option);
+
+ nghttp2_option_del(option);
nghttp2_session_callbacks_del(cbs);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"setup session for %s", p_conn->hostname);
- settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
- settings[0].value = 0;
- settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
- settings[1].value = (1 << session->window_bits_default) - 1;
-
- rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings,
- H2_ALEN(settings));
-
- /* If the connection window is larger than our default, trigger a WINDOW_UPDATE */
- add_conn_window = ((1 << session->window_bits_connection) - 1 -
- NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE);
- if (!rv && add_conn_window != 0) {
- rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, 0, add_conn_window);
- }
}
return p_conn->data;
}
-
-apr_status_t h2_proxy_session_open_stream(h2_proxy_session *session, const char *url,
- request_rec *r, h2_proxy_stream **pstream)
+static apr_status_t session_start(h2_proxy_session *session)
{
- h2_proxy_stream *stream;
- apr_uri_t puri;
- const char *authority, *scheme, *path;
-
- stream = apr_pcalloc(r->pool, sizeof(*stream));
-
- stream->pool = r->pool;
- stream->url = url;
- stream->r = r;
- stream->session = session;
- stream->state = H2_STREAM_ST_IDLE;
+ nghttp2_settings_entry settings[2];
+ int rv, add_conn_window;
- stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc);
- stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+ settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
+ settings[0].value = 0;
+ settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
+ settings[1].value = (1 << session->window_bits_default) - 1;
- stream->req = h2_request_create(1, stream->pool, 0);
-
- apr_uri_parse(stream->pool, url, &puri);
- scheme = (strcmp(puri.scheme, "h2")? "http" : "https");
- authority = puri.hostname;
- if (!ap_strchr_c(authority, ':') && puri.port
- && apr_uri_port_of_scheme(scheme) != puri.port) {
- /* port info missing and port is not default for scheme: append */
- authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port);
- }
- path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART);
- h2_request_make(stream->req, stream->pool, r->method, scheme,
- authority, path, r->headers_in);
-
- /* Tuck away all already existing cookies */
- stream->saves = apr_table_make(r->pool, 2);
- apr_table_do(add_header, stream->saves, r->headers_out,"Set-Cookie", NULL);
-
- *pstream = stream;
+ rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings,
+ H2_ALEN(settings));
- return APR_SUCCESS;
+ /* If the connection window is larger than our default, trigger a WINDOW_UPDATE */
+ add_conn_window = ((1 << session->window_bits_connection) - 1 -
+ NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE);
+ if (!rv && add_conn_window != 0) {
+ rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, 0, add_conn_window);
+ }
+ return rv? APR_EGENERAL : APR_SUCCESS;
}
static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb)
@@ -595,69 +615,57 @@ static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
- "h2_session(%ld): fed %ld bytes of input", session->c->id, (long)readlen);
+ "h2_session(%s): fed %ld bytes of input to session",
+ session->id, (long)readlen);
if (readlen == 0 && status == APR_SUCCESS) {
return APR_EAGAIN;
}
return status;
}
-
-static apr_status_t stream_loop(h2_proxy_stream *stream)
+static apr_status_t open_stream(h2_proxy_session *session, const char *url,
+ request_rec *r, h2_proxy_stream **pstream)
{
- h2_proxy_session *session = stream->session;
- apr_status_t status = APR_SUCCESS;
- int want_read, want_write;
+ h2_proxy_stream *stream;
+ apr_uri_t puri;
+ const char *authority, *scheme, *path;
+
+ stream = apr_pcalloc(r->pool, sizeof(*stream));
+
+ stream->pool = r->pool;
+ stream->url = url;
+ stream->r = r;
+ stream->session = session;
+ stream->state = H2_STREAM_ST_IDLE;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
- "h2_session(%ld): start loop for stream %d",
- session->c->id, stream->id);
- while ((status == APR_SUCCESS || APR_STATUS_IS_EAGAIN(status))
- && stream->state != H2_STREAM_ST_CLOSED) {
-
- want_read = nghttp2_session_want_read(session->ngh2);
- want_write = nghttp2_session_want_write(session->ngh2);
-
- if (want_write) {
- int rv = nghttp2_session_send(session->ngh2);
- if (rv < 0 && nghttp2_is_fatal(rv)) {
- status = APR_EGENERAL;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
- "h2_session(%ld): write, rv=%d", session->c->id, rv);
- break;
- }
- }
+ stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+ stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+
+ stream->req = h2_request_create(1, stream->pool, 0);
- if (want_read) {
- status = ap_get_brigade(session->c->input_filters, session->input,
- AP_MODE_READBYTES,
- (want_write? APR_NONBLOCK_READ : APR_BLOCK_READ),
- APR_BUCKET_BUFF_SIZE);
- if (status == APR_SUCCESS) {
- status = feed_brigade(session, session->input);
- }
- else if (!APR_STATUS_IS_EAGAIN(status)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
- "h2_session(%ld): read", session->c->id);
- break;
- }
- }
-
- if (!want_read && !want_write) {
- status = APR_EGENERAL;
- break;
- }
+ apr_uri_parse(stream->pool, url, &puri);
+ scheme = (strcmp(puri.scheme, "h2")? "http" : "https");
+ authority = puri.hostname;
+ if (!ap_strchr_c(authority, ':') && puri.port
+ && apr_uri_port_of_scheme(scheme) != puri.port) {
+ /* port info missing and port is not default for scheme: append */
+ authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port);
}
+ path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART);
+ h2_request_make(stream->req, stream->pool, r->method, scheme,
+ authority, path, r->headers_in);
+
+ /* Tuck away all already existing cookies */
+ stream->saves = apr_table_make(r->pool, 2);
+ apr_table_do(add_header, stream->saves, r->headers_out,"Set-Cookie", NULL);
+
+ *pstream = stream;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
- "h2_session(%ld): end loop for stream %d",
- session->c->id, stream->id);
- return status;
+ return APR_SUCCESS;
}
-apr_status_t h2_proxy_stream_process(h2_proxy_stream *stream)
+static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *stream)
{
- h2_proxy_session *session = stream->session;
h2_ngheader *hd;
nghttp2_data_provider *pp = NULL;
nghttp2_data_provider provider;
@@ -685,16 +693,525 @@ apr_status_t h2_proxy_stream_process(h2_proxy_stream *stream)
const char *task_id = apr_table_get(stream->r->connection->notes,
H2_TASK_ID_NOTE);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "h2_session(%ld): submit %s%s -> %d (task %s)",
- session->c->id, stream->req->authority, stream->req->path,
+ "h2_session(%s): submit %s%s -> %d (task %s)",
+ session->id, stream->req->authority, stream->req->path,
rv, task_id);
}
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_session(%s-%d): submit %s%s",
+ session->id, rv, stream->req->authority, stream->req->path);
+ }
+
if (rv > 0) {
stream->id = rv;
stream->state = H2_STREAM_ST_OPEN;
+ h2_iq_add(session->streams, stream->id, NULL, NULL);
+ dispatch_event(session, H2_PROXYS_EV_STREAM_SUBMITTED, rv, NULL);
- return stream_loop(stream);
+ return APR_SUCCESS;
}
return APR_EGENERAL;
}
+static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block)
+{
+ apr_status_t status;
+ status = ap_get_brigade(session->c->input_filters, session->input,
+ AP_MODE_READBYTES,
+ block? APR_BLOCK_READ : APR_NONBLOCK_READ,
+ APR_BUCKET_BUFF_SIZE);
+ if (status == APR_SUCCESS) {
+ if (APR_BRIGADE_EMPTY(session->input)) {
+ status = APR_EAGAIN;
+ }
+ else {
+ feed_brigade(session, session->input);
+ }
+ }
+ else if (!APR_STATUS_IS_EAGAIN(status)) {
+ dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
+ }
+ return status;
+}
+
+apr_status_t h2_proxy_session_submit(h2_proxy_session *session,
+ const char *url, request_rec *r)
+{
+ h2_proxy_stream *stream;
+ apr_status_t status;
+
+ status = open_stream(session, url, r, &stream);
+ if (status == OK) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+ "process stream(%d): %s %s%s, original: %s",
+ stream->id, stream->req->method,
+ stream->req->authority, stream->req->path,
+ r->the_request);
+ status = submit_stream(session, stream);
+ }
+ return status;
+}
+
+static apr_status_t check_suspended(h2_proxy_session *session)
+{
+ h2_proxy_stream *stream;
+ int i, stream_id;
+ apr_status_t status;
+
+ for (i = 0; i < session->suspended->nelts; ++i) {
+ stream_id = session->suspended->elts[i];
+ stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+ if (stream) {
+ status = ap_get_brigade(stream->r->input_filters, stream->input,
+ AP_MODE_READBYTES, APR_NONBLOCK_READ,
+ APR_BUCKET_BUFF_SIZE);
+ if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->input)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
+ "h2_proxy_stream(%s-%d): resuming",
+ session->id, stream_id);
+ stream->suspended = 0;
+ h2_iq_remove(session->suspended, stream_id);
+ nghttp2_session_resume_data(session->ngh2, stream_id);
+ dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
+ check_suspended(session);
+ return APR_SUCCESS;
+ }
+ else if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
+ "h2_proxy_stream(%s-%d): check input",
+ session->id, stream_id);
+ h2_iq_remove(session->suspended, stream_id);
+ dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
+ check_suspended(session);
+ return APR_SUCCESS;
+ }
+ }
+ else {
+ /* gone? */
+ h2_iq_remove(session->suspended, stream_id);
+ check_suspended(session);
+ return APR_SUCCESS;
+ }
+ }
+ return APR_EAGAIN;
+}
+
+static apr_status_t session_shutdown(h2_proxy_session *session, int reason,
+ const char *msg)
+{
+ apr_status_t status = APR_SUCCESS;
+ const char *err = msg;
+
+ AP_DEBUG_ASSERT(session);
+ if (!err && reason) {
+ err = nghttp2_strerror(reason);
+ }
+ nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0,
+ reason, (uint8_t*)err, err? strlen(err):0);
+ status = nghttp2_session_send(session->ngh2);
+ dispatch_event(session, H2_PROXYS_EV_LOCAL_GOAWAY, reason, err);
+ return status;
+}
+
+
+static const char *StateNames[] = {
+ "INIT", /* H2_PROXYS_ST_INIT */
+ "DONE", /* H2_PROXYS_ST_DONE */
+ "IDLE", /* H2_PROXYS_ST_IDLE */
+ "BUSY", /* H2_PROXYS_ST_BUSY */
+ "WAIT", /* H2_PROXYS_ST_WAIT */
+ "LSHUTDOWN", /* H2_PROXYS_ST_LOCAL_SHUTDOWN */
+ "RSHUTDOWN", /* H2_PROXYS_ST_REMOTE_SHUTDOWN */
+};
+
+static const char *state_name(h2_proxys_state state)
+{
+ if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) {
+ return "unknown";
+ }
+ return StateNames[state];
+}
+
+static int is_accepting_streams(h2_proxy_session *session)
+{
+ switch (session->state) {
+ case H2_PROXYS_ST_IDLE:
+ case H2_PROXYS_ST_BUSY:
+ case H2_PROXYS_ST_WAIT:
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+static void transit(h2_proxy_session *session, const char *action,
+ h2_proxys_state nstate)
+{
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
+ "h2_proxy_session(%s): transit [%s] -- %s --> [%s]", session->id,
+ state_name(session->state), action, state_name(nstate));
+ session->state = nstate;
+}
+
+static void ev_init(h2_proxy_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_PROXYS_ST_INIT:
+ if (h2_iq_empty(session->streams)) {
+ transit(session, "init", H2_PROXYS_ST_IDLE);
+ }
+ else {
+ transit(session, "init", H2_PROXYS_ST_BUSY);
+ }
+ break;
+
+ default:
+ /* nop */
+ break;
+ }
+}
+
+static void ev_local_goaway(h2_proxy_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+ /* already did that? */
+ break;
+ case H2_PROXYS_ST_IDLE:
+ case H2_PROXYS_ST_REMOTE_SHUTDOWN:
+ /* all done */
+ transit(session, "local goaway", H2_PROXYS_ST_DONE);
+ break;
+ default:
+ transit(session, "local goaway", H2_PROXYS_ST_LOCAL_SHUTDOWN);
+ break;
+ }
+}
+
+static void ev_remote_goaway(h2_proxy_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_PROXYS_ST_REMOTE_SHUTDOWN:
+ /* already received that? */
+ break;
+ case H2_PROXYS_ST_IDLE:
+ case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+ /* all done */
+ transit(session, "remote goaway", H2_PROXYS_ST_DONE);
+ break;
+ default:
+ transit(session, "remote goaway", H2_PROXYS_ST_REMOTE_SHUTDOWN);
+ break;
+ }
+}
+
+static void ev_conn_error(h2_proxy_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_PROXYS_ST_INIT:
+ case H2_PROXYS_ST_DONE:
+ case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+ /* just leave */
+ transit(session, "conn error", H2_PROXYS_ST_DONE);
+ break;
+
+ default:
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_session(%s): conn error -> shutdown", session->id);
+ session_shutdown(session, arg, msg);
+ break;
+ }
+}
+
+static void ev_proto_error(h2_proxy_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_PROXYS_ST_DONE:
+ case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+ /* just leave */
+ transit(session, "proto error", H2_PROXYS_ST_DONE);
+ break;
+
+ default:
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_session(%s): proto error -> shutdown", session->id);
+ session_shutdown(session, arg, msg);
+ break;
+ }
+}
+
+static void ev_conn_timeout(h2_proxy_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+ transit(session, "conn timeout", H2_PROXYS_ST_DONE);
+ break;
+ default:
+ session_shutdown(session, arg, msg);
+ transit(session, "conn timeout", H2_PROXYS_ST_DONE);
+ break;
+ }
+}
+
+static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_PROXYS_ST_BUSY:
+ case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+ case H2_PROXYS_ST_REMOTE_SHUTDOWN:
+ /* nothing for input and output to do. If we remain
+ * in this state, we go into a tight loop and suck up
+ * CPU cycles. Ideally, we'd like to do a blocking read, but that
+ * is not possible if we have scheduled tasks and wait
+ * for them to produce something. */
+ if (h2_iq_empty(session->streams)) {
+ if (!is_accepting_streams(session)) {
+ /* We are no longer accepting new streams and have
+ * finished processing existing ones. Time to leave. */
+ session_shutdown(session, arg, msg);
+ transit(session, "no io", H2_PROXYS_ST_DONE);
+ }
+ else {
+ /* When we have no streams, no task event are possible,
+ * switch to blocking reads */
+ transit(session, "no io", H2_PROXYS_ST_IDLE);
+ }
+ }
+ else {
+ /* Unable to do blocking reads, as we wait on events from
+ * task processing in other threads. Do a busy wait with
+ * backoff timer. */
+ transit(session, "no io", H2_PROXYS_ST_WAIT);
+ }
+ break;
+ default:
+ /* nop */
+ break;
+ }
+}
+
+static void ev_stream_submitted(h2_proxy_session *session, int stream_id,
+ const char *msg)
+{
+ switch (session->state) {
+ case H2_PROXYS_ST_IDLE:
+ case H2_PROXYS_ST_WAIT:
+ transit(session, "stream submitted", H2_PROXYS_ST_BUSY);
+ break;
+ default:
+ /* nop */
+ break;
+ }
+}
+
+static void ev_stream_done(h2_proxy_session *session, int stream_id,
+ const char *msg)
+{
+ h2_proxy_stream *stream;
+
+ stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+ if (stream) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_proxy_sesssion(%s): stream(%d) closed",
+ session->id, stream_id);
+ if (!stream->data_received) {
+ /* last chance to manipulate response headers.
+ * after this, only trailers */
+ stream->data_received = 1;
+ }
+ stream->state = H2_STREAM_ST_CLOSED;
+ h2_iq_remove(session->streams, stream_id);
+ h2_iq_remove(session->suspended, stream_id);
+ if (session->done) {
+ session->done(session, stream->r);
+ }
+ }
+
+ switch (session->state) {
+ default:
+ /* nop */
+ break;
+ }
+}
+
+static void ev_stream_resumed(h2_proxy_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_PROXYS_ST_WAIT:
+ transit(session, "stream resumed", H2_PROXYS_ST_BUSY);
+ break;
+ default:
+ /* nop */
+ break;
+ }
+}
+
+static void ev_data_read(h2_proxy_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_PROXYS_ST_IDLE:
+ case H2_PROXYS_ST_WAIT:
+ transit(session, "data read", H2_PROXYS_ST_BUSY);
+ break;
+ /* fall through */
+ default:
+ /* nop */
+ break;
+ }
+}
+
+static void ev_ngh2_done(h2_proxy_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_PROXYS_ST_DONE:
+ /* nop */
+ break;
+ default:
+ transit(session, "nghttp2 done", H2_PROXYS_ST_DONE);
+ break;
+ }
+}
+
+static void ev_pre_close(h2_proxy_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_PROXYS_ST_DONE:
+ case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+ /* nop */
+ break;
+ default:
+ session_shutdown(session, arg, msg);
+ break;
+ }
+}
+
+static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
+ int arg, const char *msg)
+{
+ switch (ev) {
+ case H2_PROXYS_EV_INIT:
+ ev_init(session, arg, msg);
+ break;
+ case H2_PROXYS_EV_LOCAL_GOAWAY:
+ ev_local_goaway(session, arg, msg);
+ break;
+ case H2_PROXYS_EV_REMOTE_GOAWAY:
+ ev_remote_goaway(session, arg, msg);
+ break;
+ case H2_PROXYS_EV_CONN_ERROR:
+ ev_conn_error(session, arg, msg);
+ break;
+ case H2_PROXYS_EV_PROTO_ERROR:
+ ev_proto_error(session, arg, msg);
+ break;
+ case H2_PROXYS_EV_CONN_TIMEOUT:
+ ev_conn_timeout(session, arg, msg);
+ break;
+ case H2_PROXYS_EV_NO_IO:
+ ev_no_io(session, arg, msg);
+ break;
+ case H2_PROXYS_EV_STREAM_SUBMITTED:
+ ev_stream_submitted(session, arg, msg);
+ break;
+ case H2_PROXYS_EV_STREAM_DONE:
+ ev_stream_done(session, arg, msg);
+ break;
+ case H2_PROXYS_EV_STREAM_RESUMED:
+ ev_stream_resumed(session, arg, msg);
+ break;
+ case H2_PROXYS_EV_DATA_READ:
+ ev_data_read(session, arg, msg);
+ break;
+ case H2_PROXYS_EV_NGH2_DONE:
+ ev_ngh2_done(session, arg, msg);
+ break;
+ case H2_PROXYS_EV_PRE_CLOSE:
+ ev_pre_close(session, arg, msg);
+ break;
+ default:
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_session(%s): unknown event %d",
+ session->id, ev);
+ break;
+ }
+}
+
+apr_status_t h2_proxy_session_process(h2_proxy_session *session)
+{
+ apr_status_t status;
+ int have_written = 0, have_read = 0;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_session(%s): process", session->id);
+
+ switch (session->state) {
+ case H2_PROXYS_ST_INIT:
+ status = session_start(session);
+ if (status == APR_SUCCESS) {
+ dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL);
+ }
+ else {
+ dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
+ }
+ break;
+
+ case H2_PROXYS_ST_BUSY:
+ case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+ case H2_PROXYS_ST_REMOTE_SHUTDOWN:
+ while (nghttp2_session_want_write(session->ngh2)) {
+ int rv = nghttp2_session_send(session->ngh2);
+ if (rv < 0 && nghttp2_is_fatal(rv)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_session(%s): write, rv=%d", session->id, rv);
+ dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
+ break;
+ }
+ have_written = 1;
+ }
+
+ if (nghttp2_session_want_read(session->ngh2)) {
+ if (h2_proxy_session_read(session, 0) == APR_SUCCESS) {
+ have_read = 1;
+ }
+ }
+
+ if (!have_written && !have_read
+ && !nghttp2_session_want_write(session->ngh2)) {
+ dispatch_event(session, H2_PROXYS_EV_NO_IO, 0, NULL);
+ }
+ break;
+
+ case H2_PROXYS_ST_WAIT:
+ if (check_suspended(session) == APR_EAGAIN) {
+ /* no stream has become resumed. Do a blocking read with
+ * ever increasing timeouts... */
+ if (h2_proxy_session_read(session, 0) == APR_SUCCESS) {
+ dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
+ }
+ }
+ break;
+
+ case H2_PROXYS_ST_IDLE:
+ return APR_SUCCESS;
+
+ case H2_PROXYS_ST_DONE:
+ return APR_SUCCESS;
+
+ default:
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c,
+ APLOGNO()"h2_session(%s): unknown state %d",
+ session->id, session->state);
+ dispatch_event(session, H2_PROXYS_EV_PROTO_ERROR, 0, NULL);
+ break;
+ }
+
+
+ if (!nghttp2_session_want_read(session->ngh2)
+ && !nghttp2_session_want_write(session->ngh2)) {
+ dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL);
+ }
+
+ return APR_EAGAIN;
+}
+
diff --git a/modules/http2/h2_proxy_session.h b/modules/http2/h2_proxy_session.h
index 38924afdee..089fd107a5 100644
--- a/modules/http2/h2_proxy_session.h
+++ b/modules/http2/h2_proxy_session.h
@@ -20,50 +20,72 @@
#include <nghttp2/nghttp2.h>
-typedef struct h2_proxy_session {
+struct h2_int_queue;
+
+typedef enum {
+ H2_PROXYS_ST_INIT, /* send initial SETTINGS, etc. */
+ H2_PROXYS_ST_DONE, /* finished, connection close */
+ H2_PROXYS_ST_IDLE, /* no streams to process */
+ H2_PROXYS_ST_BUSY, /* read/write without stop */
+ H2_PROXYS_ST_WAIT, /* waiting for tasks reporting back */
+ H2_PROXYS_ST_LOCAL_SHUTDOWN, /* we announced GOAWAY */
+ H2_PROXYS_ST_REMOTE_SHUTDOWN, /* client announced GOAWAY */
+} h2_proxys_state;
+
+typedef enum {
+ H2_PROXYS_EV_INIT, /* session was initialized */
+ H2_PROXYS_EV_LOCAL_GOAWAY, /* we send a GOAWAY */
+ H2_PROXYS_EV_REMOTE_GOAWAY, /* remote send us a GOAWAY */
+ H2_PROXYS_EV_CONN_ERROR, /* connection error */
+ H2_PROXYS_EV_PROTO_ERROR, /* protocol error */
+ H2_PROXYS_EV_CONN_TIMEOUT, /* connection timeout */
+ H2_PROXYS_EV_NO_IO, /* nothing has been read or written */
+ H2_PROXYS_EV_STREAM_SUBMITTED, /* stream has been submitted */
+ H2_PROXYS_EV_STREAM_DONE, /* stream has been finished */
+ H2_PROXYS_EV_STREAM_RESUMED, /* stream signalled availability of headers/data */
+ H2_PROXYS_EV_DATA_READ, /* connection data has been read */
+ H2_PROXYS_EV_NGH2_DONE, /* nghttp2 wants neither read nor write anything */
+ H2_PROXYS_EV_PRE_CLOSE, /* connection will close after this */
+} h2_proxys_event_t;
+
+
+typedef struct h2_proxy_session h2_proxy_session;
+typedef void h2_proxy_request_done(h2_proxy_session *s, request_rec *r);
+
+struct h2_proxy_session {
+ const char *id;
conn_rec *c;
proxy_conn_rec *p_conn;
proxy_server_conf *conf;
apr_pool_t *pool;
nghttp2_session *ngh2; /* the nghttp2 session itself */
+ h2_proxy_request_done *done;
+ void *user_data;
+
int window_bits_default;
int window_bits_connection;
- unsigned int goaway_recvd : 1;
- unsigned int goaway_sent : 1;
-
+ h2_proxys_state state;
+
+ struct h2_int_queue *streams;
+ struct h2_int_queue *suspended;
+ apr_size_t remote_max_concurrent;
int max_stream_recv;
apr_bucket_brigade *input;
apr_bucket_brigade *output;
-} h2_proxy_session;
-
-typedef struct h2_proxy_stream {
- int id;
- apr_pool_t *pool;
- h2_proxy_session *session;
-
- const char *url;
- request_rec *r;
- h2_request *req;
-
- h2_stream_state_t state;
- unsigned int data_received : 1;
-
- apr_bucket_brigade *input;
- apr_bucket_brigade *output;
-
- apr_table_t *saves;
-} h2_proxy_stream;
+};
+h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
+ proxy_server_conf *conf,
+ h2_proxy_request_done *done);
-h2_proxy_session *h2_proxy_session_setup(request_rec *r, proxy_conn_rec *p_connm,
- proxy_server_conf *conf);
+apr_status_t h2_proxy_session_submit(h2_proxy_session *s, const char *url,
+ request_rec *r);
+
+apr_status_t h2_proxy_session_process(h2_proxy_session *s);
-apr_status_t h2_proxy_session_open_stream(h2_proxy_session *s, const char *url,
- request_rec *r, h2_proxy_stream **pstream);
-apr_status_t h2_proxy_stream_process(h2_proxy_stream *stream);
#define H2_PROXY_REQ_URL_NOTE "h2-proxy-req-url"
diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c
index 0eb68a54be..cd49843a0e 100644
--- a/modules/http2/h2_session.c
+++ b/modules/http2/h2_session.c
@@ -103,8 +103,6 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id)
return stream;
}
-#ifdef H2_NG2_STREAM_API
-
/**
* Determine the importance of streams when scheduling tasks.
* - if both stream depend on the same one, compare weights
@@ -158,20 +156,6 @@ static int stream_pri_cmp(int sid1, int sid2, void *ctx)
return spri_cmp(sid1, s1, sid2, s2, session);
}
-#else /* ifdef H2_NG2_STREAM_API */
-
-/* In absence of nghttp2_stream API, which gives information about
- * priorities since nghttp2 1.3.x, we just sort the streams by
- * their identifier, aka. order of arrival.
- */
-static int stream_pri_cmp(int sid1, int sid2, void *ctx)
-{
- (void)ctx;
- return sid1 - sid2;
-}
-
-#endif /* (ifdef else) H2_NG2_STREAM_API */
-
static apr_status_t stream_schedule(h2_session *session,
h2_stream *stream, int eos)
{
diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c
index 9fe003247d..8ccb279419 100644
--- a/modules/http2/h2_task.c
+++ b/modules/http2/h2_task.c
@@ -298,6 +298,8 @@ apr_status_t h2_task_freeze(h2_task *task, request_rec *r)
task->frozen = 1;
task->frozen_out = apr_brigade_create(c->pool, c->bucket_alloc);
ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->output->c,
+ "h2_task(%s), frozen", task->id);
}
return APR_SUCCESS;
}
@@ -306,6 +308,8 @@ apr_status_t h2_task_thaw(h2_task *task)
{
if (task->frozen) {
task->frozen = 0;
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->output->c,
+ "h2_task(%s), thawed", task->id);
}
return APR_SUCCESS;
}
diff --git a/modules/http2/h2_task_input.c b/modules/http2/h2_task_input.c
index 953433d459..7409c363db 100644
--- a/modules/http2/h2_task_input.c
+++ b/modules/http2/h2_task_input.c
@@ -50,6 +50,7 @@ h2_task_input *h2_task_input_create(h2_task *task, conn_rec *c)
input->c = c;
input->task = task;
input->bb = NULL;
+ input->block = APR_BLOCK_READ;
if (task->ser_headers) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
@@ -75,6 +76,11 @@ h2_task_input *h2_task_input_create(h2_task *task, conn_rec *c)
return input;
}
+void h2_task_input_block_set(h2_task_input *input, apr_read_type_e block)
+{
+ input->block = block;
+}
+
apr_status_t h2_task_input_read(h2_task_input *input,
ap_filter_t* f,
apr_bucket_brigade* bb,
@@ -115,7 +121,7 @@ apr_status_t h2_task_input_read(h2_task_input *input,
return APR_EOF;
}
- while ((bblen == 0) || (mode == AP_MODE_READBYTES && bblen < readbytes)) {
+ while (bblen == 0) {
/* Get more data for our stream from mplx.
*/
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
@@ -124,27 +130,31 @@ apr_status_t h2_task_input_read(h2_task_input *input,
input->task->id, block,
(long)readbytes, (long)bblen);
- /* Although we sometimes get called with APR_NONBLOCK_READs,
- we need to fill our buffer blocking. Otherwise we get EAGAIN,
- return that to our caller and everyone throws up their hands,
- never calling us again. */
- status = h2_mplx_in_read(input->task->mplx, APR_BLOCK_READ,
+ /* Override the block mode we get called with depending on the input's
+ * setting.
+ */
+ status = h2_mplx_in_read(input->task->mplx, block,
input->task->stream_id, input->bb,
f->r? f->r->trailers_in : NULL,
input->task->io);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
"h2_task_input(%s): mplx in read returned",
input->task->id);
- if (status != APR_SUCCESS) {
+ if (APR_STATUS_IS_EAGAIN(status)
+ && (mode == AP_MODE_GETLINE || block == APR_BLOCK_READ)) {
+ /* chunked input handling does not seem to like it if we
+ * return with APR_EAGAIN from a GETLINE read...
+ * upload 100k test on test-ser.example.org hangs */
+ status = APR_SUCCESS;
+ }
+ else if (status != APR_SUCCESS) {
return status;
}
+
status = apr_brigade_length(input->bb, 1, &bblen);
if (status != APR_SUCCESS) {
return status;
}
- if ((bblen == 0) && (block == APR_NONBLOCK_READ)) {
- return h2_util_has_eos(input->bb, -1)? APR_EOF : APR_EAGAIN;
- }
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
"h2_task_input(%s): mplx in read, %ld bytes in brigade",
diff --git a/modules/http2/h2_task_input.h b/modules/http2/h2_task_input.h
index 6085b78c8f..1488dcab49 100644
--- a/modules/http2/h2_task_input.h
+++ b/modules/http2/h2_task_input.h
@@ -29,6 +29,7 @@ struct h2_task_input {
conn_rec *c;
struct h2_task *task;
apr_bucket_brigade *bb;
+ apr_read_type_e block;
};
@@ -41,4 +42,6 @@ apr_status_t h2_task_input_read(h2_task_input *input,
apr_read_type_e block,
apr_off_t readbytes);
+void h2_task_input_block_set(h2_task_input *input, apr_read_type_e block);
+
#endif /* defined(__mod_h2__h2_task_input__) */
diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c
index db717d9dc5..b64683ab17 100644
--- a/modules/http2/h2_util.c
+++ b/modules/http2/h2_util.c
@@ -1110,7 +1110,7 @@ int h2_util_frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen)
size_t len = (frame->goaway.opaque_data_len < s_len)?
frame->goaway.opaque_data_len : s_len-1;
memcpy(scratch, frame->goaway.opaque_data, len);
- scratch[len+1] = '\0';
+ scratch[len] = '\0';
return apr_snprintf(buffer, maxlen, "GOAWAY[error=%d, reason='%s']",
frame->goaway.error_code, scratch);
}
diff --git a/modules/http2/h2_worker.c b/modules/http2/h2_worker.c
index 56feec118d..fd0e9bae06 100644
--- a/modules/http2/h2_worker.c
+++ b/modules/http2/h2_worker.c
@@ -61,6 +61,7 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx)
while (req) {
conn_rec *c, *master = m->c;
+ h2_task *task;
int stream_id = req->id;
if (!task_pool) {
@@ -81,38 +82,32 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx)
c = h2_slave_create(master, task_pool,
worker->thread, worker->socket);
- if (!c) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
- APLOGNO(02957) "h2_request(%ld-%d): error setting up slave connection",
- m->id, stream_id);
- h2_mplx_out_rst(m, stream_id, H2_ERR_INTERNAL_ERROR);
+
+ task = h2_task_create(m->id, req, task_pool, m);
+ h2_ctx_create_for(c, task);
+
+ h2_task_do(task, c, worker->io, worker->socket);
+
+ if (task->frozen) {
+ /* this task was handed over to someone else for processing */
+ h2_task_thaw(task);
+ task_pool = NULL;
+ req = NULL;
+ h2_mplx_request_done(m, 0, worker->aborted? NULL : &req);
}
else {
- h2_task *task;
-
- task = h2_task_create(m->id, req, task_pool, m);
- h2_ctx_create_for(c, task);
- h2_task_do(task, c, worker->io, worker->socket);
-
- if (task->frozen) {
- /* this task was handed over to someone else for
- * processing */
- task_pool = NULL;
- }
- task = NULL;
+ /* clean our references and report request as done. Signal
+ * that we want another unless we have been aborted */
+ /* TODO: this will keep a worker attached to this h2_mplx as
+ * long as it has requests to handle. Might no be fair to
+ * other mplx's. Perhaps leave after n requests? */
apr_thread_cond_signal(worker->io);
+ if (task_pool) {
+ apr_pool_clear(task_pool);
+ }
+ req = NULL;
+ h2_mplx_request_done(m, stream_id, worker->aborted? NULL : &req);
}
-
- /* clean our references and report request as done. Signal
- * that we want another unless we have been aborted */
- /* TODO: this will keep a worker attached to this h2_mplx as
- * long as it has requests to handle. Might no be fair to
- * other mplx's. Perhaps leave after n requests? */
- req = NULL;
- if (task_pool) {
- apr_pool_clear(task_pool);
- }
- h2_mplx_request_done(&m, stream_id, worker->aborted? NULL : &req);
}
}
diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c
index f7606dc4ed..9d648e9948 100644
--- a/modules/http2/h2_workers.c
+++ b/modules/http2/h2_workers.c
@@ -26,7 +26,6 @@
#include "h2_private.h"
#include "h2_mplx.h"
#include "h2_request.h"
-#include "h2_task_queue.h"
#include "h2_worker.h"
#include "h2_workers.h"
diff --git a/modules/http2/h2_workers.h b/modules/http2/h2_workers.h
index 7ec3881310..c6a24cab14 100644
--- a/modules/http2/h2_workers.h
+++ b/modules/http2/h2_workers.h
@@ -27,7 +27,6 @@ struct apr_thread_cond_t;
struct h2_mplx;
struct h2_request;
struct h2_task;
-struct h2_task_queue;
typedef struct h2_workers h2_workers;
diff --git a/modules/http2/mod_http2.c b/modules/http2/mod_http2.c
index 49de804bb6..87cac2f678 100644
--- a/modules/http2/mod_http2.c
+++ b/modules/http2/mod_http2.c
@@ -132,50 +132,24 @@ static apr_status_t http2_req_engine_push(const char *engine_type,
request_rec *r,
h2_req_engine_init *einit)
{
- h2_ctx *ctx = h2_ctx_rget(r);
- if (ctx) {
- h2_task *task = h2_ctx_get_task(ctx);
- if (task) {
- return h2_mplx_engine_push(task->mplx, task, engine_type, r, einit);
- }
- }
- return APR_EINVAL;
+ return h2_mplx_engine_push(engine_type, r, einit);
}
static apr_status_t http2_req_engine_pull(h2_req_engine *engine,
- apr_time_t timeout, request_rec **pr)
+ apr_read_type_e block,
+ request_rec **pr)
{
- h2_ctx *ctx = h2_ctx_get(engine->c, 0);
- if (ctx) {
- h2_task *task = h2_ctx_get_task(ctx);
- if (task) {
- return h2_mplx_engine_pull(task->mplx, task, engine, timeout, pr);
- }
- }
- return APR_ECONNABORTED;
+ return h2_mplx_engine_pull(engine, block, pr);
}
static void http2_req_engine_done(h2_req_engine *engine, conn_rec *r_conn)
{
- h2_ctx *ctx = h2_ctx_get(r_conn, 0);
- if (ctx) {
- h2_task *task = h2_ctx_get_task(ctx);
- if (task) {
- h2_mplx_engine_done(task->mplx, task, r_conn);
- /* task is destroyed */
- }
- }
+ h2_mplx_engine_done(engine, r_conn);
}
static void http2_req_engine_exit(h2_req_engine *engine)
{
- h2_ctx *ctx = h2_ctx_get(engine->c, 0);
- if (ctx) {
- h2_task *task = h2_ctx_get_task(ctx);
- if (task) {
- h2_mplx_engine_exit(task->mplx, task, engine);
- }
- }
+ h2_mplx_engine_exit(engine);
}
diff --git a/modules/http2/mod_http2.dsp b/modules/http2/mod_http2.dsp
index a1dfec4700..0431548a96 100644
--- a/modules/http2/mod_http2.dsp
+++ b/modules/http2/mod_http2.dsp
@@ -141,6 +141,10 @@ SOURCE=./h2_h2.c
# End Source File
# Begin Source File
+SOURCE=./h2_int_queue.c
+# End Source File
+# Begin Source File
+
SOURCE=./h2_io.c
# End Source File
# Begin Source File
@@ -193,10 +197,6 @@ SOURCE=./h2_task_output.c
# End Source File
# Begin Source File
-SOURCE=./h2_task_queue.c
-# End Source File
-# Begin Source File
-
SOURCE=./h2_util.c
# End Source File
# Begin Source File
diff --git a/modules/http2/mod_http2.h b/modules/http2/mod_http2.h
index 8055320ef5..edacd0f134 100644
--- a/modules/http2/mod_http2.h
+++ b/modules/http2/mod_http2.h
@@ -29,9 +29,11 @@ APR_DECLARE_OPTIONAL_FN(int,
/*******************************************************************************
- * HTTP/2 slave engines
+ * HTTP/2 request engines
******************************************************************************/
+struct apr_thread_cond_t;
+
typedef struct h2_req_engine h2_req_engine;
/**
@@ -45,25 +47,17 @@ typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, request_rec *r);
/**
* The public structure of a h2_req_engine. It gets allocated by the http2
- * infrastructure, assigned id, type, pool and connection and passed to the
+ * infrastructure, assigned id, type, pool, io and connection and passed to the
* h2_req_engine_init() callback to complete initialization.
* This happens whenever a new request gets "push"ed for an engine type and
* no instance, or no free instance, for the type is available.
*/
struct h2_req_engine {
- int id; /* identifier, unique for a master connection */
- const char *type; /* name of the engine type */
+ const char *id; /* identifier */
apr_pool_t *pool; /* pool for engine specific allocations */
- conn_rec *c; /* connection this engine is assigned to */
- apr_size_t r_capacity; /* request capacity engine is willing to handle,
- may change between invocations. If the engine
- sets this to 0, it signals that it no longer
- wants more requests. New requests, already
- scheduled for this engine might still arrive for
- a time. */
- apr_size_t r_count; /* number of request currently assigned, it is the
- responsibility of the engine to update this. */
- void *data; /* engine specific data */
+ const char *type; /* name of the engine type */
+ apr_size_t capacity; /* number of max assigned requests */
+ void *user_data; /* user specific data */
};
/**
@@ -95,7 +89,7 @@ APR_DECLARE_OPTIONAL_FN(apr_status_t,
*/
APR_DECLARE_OPTIONAL_FN(apr_status_t,
http2_req_engine_pull, (h2_req_engine *engine,
- apr_time_t timeout,
+ apr_read_type_e block,
request_rec **pr));
APR_DECLARE_OPTIONAL_FN(void,
http2_req_engine_done, (h2_req_engine *engine,
diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c
index 1130d357c5..dbb041cb4b 100644
--- a/modules/http2/mod_proxy_http2.c
+++ b/modules/http2/mod_proxy_http2.c
@@ -21,6 +21,7 @@
#include "mod_proxy_http2.h"
+#include "h2_int_queue.h"
#include "h2_request.h"
#include "h2_util.h"
#include "h2_version.h"
@@ -43,12 +44,13 @@ static int (*is_h2)(conn_rec *c);
static apr_status_t (*req_engine_push)(const char *name, request_rec *r,
h2_req_engine_init *einit);
static apr_status_t (*req_engine_pull)(h2_req_engine *engine,
- apr_time_t timeout, request_rec **pr);
+ apr_read_type_e block, request_rec **pr);
static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn);
static void (*req_engine_exit)(h2_req_engine *engine);
typedef struct h2_proxy_ctx {
conn_rec *owner;
+ request_rec *rbase;
server_rec *server;
const char *proxy_func;
char server_portstr[32];
@@ -189,19 +191,51 @@ static int proxy_http2_canon(request_rec *r, char *url)
static apr_status_t proxy_engine_init(h2_req_engine *engine, request_rec *r)
{
- h2_proxy_ctx *ctx = ap_get_module_config(engine->c->conn_config,
+ h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config,
&proxy_http2_module);
if (ctx) {
+ engine->capacity = 20; /* conservative guess until we know */
ctx->engine = engine;
return APR_SUCCESS;
}
+ ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r,
+ "h2_proxy_session, engine init, no ctx found");
return APR_ENOTIMPL;
}
-static int proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) {
- int status = OK;
+static apr_status_t add_request(h2_proxy_session *session, request_rec *r)
+{
+ h2_proxy_ctx *ctx = session->user_data;
+ const char *url;
+ apr_status_t status;
+
+ url = apr_table_get(r->notes, H2_PROXY_REQ_URL_NOTE);
+ status = h2_proxy_session_submit(session, url, r);
+ if (status != OK) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, status, r->connection, APLOGNO()
+ "pass request body failed to %pI (%s) from %s (%s)",
+ ctx->p_conn->addr, ctx->p_conn->hostname ?
+ ctx->p_conn->hostname: "", session->c->client_ip,
+ session->c->remote_host ? session->c->remote_host: "");
+ }
+ return status;
+}
+
+static void request_done(h2_proxy_session *session, request_rec *r)
+{
+ h2_proxy_ctx *ctx = session->user_data;
+
+ if (req_engine_done && r != ctx->rbase) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
+ "h2_proxy_session(%s): request %s",
+ ctx->engine->id, r->the_request);
+ req_engine_done(ctx->engine, r->connection);
+ }
+}
+
+static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) {
+ apr_status_t status = OK;
h2_proxy_session *session;
- h2_proxy_stream *stream;
/* Step Two: Make the Connection (or check that an already existing
* socket is still usable). On success, we have a socket connected to
@@ -245,52 +279,45 @@ static int proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) {
/* Step Four: Send the Request in a new HTTP/2 stream and
* loop until we got the response or encounter errors.
*/
- status = APR_ENOTIMPL;
- session = h2_proxy_session_setup(r, ctx->p_conn, ctx->conf);
+ session = h2_proxy_session_setup(ctx->engine->id, ctx->p_conn,
+ ctx->conf, request_done);
if (!session) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->p_conn->connection,
"session unavailable");
return HTTP_SERVICE_UNAVAILABLE;
}
- while (r) {
- conn_rec *r_conn = r->connection;
- const char *url;
-
- url = apr_table_get(r->notes, H2_PROXY_REQ_URL_NOTE);
- status = h2_proxy_session_open_stream(session, url, r, &stream);
- if (status == OK) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r_conn,
- "process stream(%d): %s %s%s, original: %s",
- stream->id, stream->req->method,
- stream->req->authority, stream->req->path,
- r->the_request);
- status = h2_proxy_stream_process(stream);
- }
- r = NULL;
-
- if (status != OK) {
- ap_log_cerror(APLOG_MARK, APLOG_ERR, status, r_conn, APLOGNO()
- "pass request body failed to %pI (%s) from %s (%s)",
- ctx->p_conn->addr, ctx->p_conn->hostname ?
- ctx->p_conn->hostname: "", session->c->client_ip,
- session->c->remote_host ? session->c->remote_host: "");
- }
-
- if (!ctx->standalone && req_engine_done && r_conn != ctx->owner) {
- req_engine_done(ctx->engine, r_conn);
- }
- r_conn = NULL;
+ session->user_data = ctx;
+ add_request(session, r);
+
+ status = APR_EAGAIN;
+ while (APR_STATUS_IS_EAGAIN(status)) {
+ status = h2_proxy_session_process(session);
- if (!ctx->standalone && req_engine_pull) {
- status = req_engine_pull(ctx->engine, ctx->server->timeout, &r);
- if (status != APR_SUCCESS) {
- status = APR_SUCCESS;
- break;
+ if (APR_STATUS_IS_EAGAIN(status) && !ctx->standalone) {
+ ctx->engine->capacity = session->remote_max_concurrent;
+ if (req_engine_pull(ctx->engine, APR_NONBLOCK_READ, &r) == APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_proxy_session(%s): pulled request %s",
+ session->id, r->the_request);
+ add_request(session, r);
}
}
}
+ if (session->state == H2_PROXYS_ST_DONE) {
+ ctx->p_conn->close = 1;
+ }
+
+ if (session->streams && !h2_iq_empty(session->streams)) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, status,
+ ctx->p_conn->connection,
+ "session run done with %d streams unfinished",
+ h2_iq_size(session->streams));
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status,
+ ctx->p_conn->connection, "session run done");
+ session->user_data = NULL;
return status;
}
@@ -339,6 +366,7 @@ static int proxy_http2_handler(request_rec *r,
ctx = apr_pcalloc(p, sizeof(*ctx));
ctx->owner = c;
+ ctx->rbase = r;
ctx->server = s;
ctx->proxy_func = proxy_func;
ctx->is_ssl = is_ssl;
@@ -387,12 +415,12 @@ static int proxy_http2_handler(request_rec *r,
* the same backend. We may be called to create an engine ourself.
*/
status = req_engine_push(engine_type, r, proxy_engine_init);
- ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
- "H2: pushing request %s to engine type %s",
- url, engine_type);
if (status == APR_SUCCESS && ctx->engine == NULL) {
/* Another engine instance has taken over processing of this
* request. */
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+ "H2: pushed request %s to engine type %s",
+ url, engine_type);
goto cleanup;
}
}
@@ -401,25 +429,41 @@ static int proxy_http2_handler(request_rec *r,
/* No engine was available or has been initialized, handle this
* request just by ourself. */
h2_req_engine *engine = apr_pcalloc(p, sizeof(*engine));
- engine->id = 0;
+ engine->id = apr_psprintf(p, "eng-proxy-%ld", c->id);
engine->type = engine_type;
engine->pool = p;
- engine->c = c;
+ engine->capacity = 1;
ctx->engine = engine;
ctx->standalone = 1;
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+ "h2_proxy_http2(%ld): setup standalone engine for type %s",
+ c->id, engine_type);
+ }
+ else {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+ "H2: hosting engine %s for request %s", ctx->engine->id, url);
}
- status = proxy_engine_run(ctx, r);
+ status = proxy_engine_run(ctx, r);
+ if (!ctx->standalone && status == APR_SUCCESS) {
+ apr_status_t s2;
+ do {
+ s2 = req_engine_pull(ctx->engine, APR_BLOCK_READ, &r);
+ if (s2 == APR_SUCCESS) {
+ s2 = proxy_engine_run(ctx, r);
+ }
+ } while (s2 != APR_EOF);
+ }
cleanup:
- if (ctx->engine && !ctx->standalone && req_engine_exit) {
+ if (!ctx->standalone && ctx->engine && req_engine_exit) {
req_engine_exit(ctx->engine);
}
ctx->engine = NULL;
if (ctx) {
if (ctx->p_conn) {
- if (status != OK) {
+ if (status != APR_SUCCESS) {
ctx->p_conn->close = 1;
}
proxy_run_detach_backend(r, ctx->p_conn);