summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Eissing <icing@apache.org>2017-04-10 15:04:55 +0000
committerStefan Eissing <icing@apache.org>2017-04-10 15:04:55 +0000
commit6db47ac4498920b4a9e4ef7bb7ff922b103cba4a (patch)
tree879b53b0dd988020539ec63ab77c50bdf89ec2f0
parent65b12b9dc673adecb1193685ac1ba238508b292b (diff)
downloadhttpd-6db47ac4498920b4a9e4ef7bb7ff922b103cba4a.tar.gz
On the 2.4.x branch:
Merged /httpd/httpd/trunk:r1789740,1790102,1790113,1790284,1790754,1790826-1790827,1790842 git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1790847 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--CHANGES7
-rw-r--r--modules/http2/h2_bucket_beam.c4
-rw-r--r--modules/http2/h2_filter.c31
-rw-r--r--modules/http2/h2_mplx.c653
-rw-r--r--modules/http2/h2_mplx.h4
-rw-r--r--modules/http2/h2_ngn_shed.c15
-rw-r--r--modules/http2/h2_proxy_session.c26
-rw-r--r--modules/http2/h2_proxy_util.c281
-rw-r--r--modules/http2/h2_proxy_util.h51
-rw-r--r--modules/http2/h2_session.c28
-rw-r--r--modules/http2/h2_session.h5
-rw-r--r--modules/http2/h2_stream.c171
-rw-r--r--modules/http2/h2_task.c2
-rw-r--r--modules/http2/h2_util.c88
-rw-r--r--modules/http2/h2_util.h28
-rw-r--r--modules/http2/h2_version.h4
-rw-r--r--modules/http2/h2_workers.c38
-rw-r--r--modules/http2/h2_workers.h4
-rw-r--r--modules/http2/mod_proxy_http2.c179
19 files changed, 1011 insertions, 608 deletions
diff --git a/CHANGES b/CHANGES
index 25198ab917..42d1f0b048 100644
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,10 @@
Changes with Apache 2.4.26
+ *) mod_proxy_http2: Fixed bug in re-attempting proxy requests after
+ connection error. Reliability of reconnect handling improved.
+ [Stefan Eissing]
+
*) mod_http2: better performance, eliminated need for nested locks and
thread privates. Moving request setups from the main connection to the
worker threads. Increase number of spare connections kept.
@@ -22,9 +26,6 @@ Changes with Apache 2.4.26
format from 2.2 in the Last Modified column. PR60846.
[Hank Ibell <hwibell gmail.com>]
- *) mod_http2: fixed PR60869 by making h2 workers exit explicitly waking up
- all threads to exit in a defined way. [Stefan Eissing]
-
*) core: Add %{REMOTE_PORT} to the expression parser. PR59938
[Hank Ibell <hwibell gmail.com>]
diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c
index 17ad3d95f1..872c67544d 100644
--- a/modules/http2/h2_bucket_beam.c
+++ b/modules/http2/h2_bucket_beam.c
@@ -1035,7 +1035,11 @@ transfer:
++transferred;
}
else {
+ /* let outside hook determine how bucket is beamed */
+ leave_yellow(beam, &bl);
brecv = h2_beam_bucket(beam, bb, bsender);
+ enter_yellow(beam, &bl);
+
while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) {
++transferred;
remain -= brecv->length;
diff --git a/modules/http2/h2_filter.c b/modules/http2/h2_filter.c
index 3a8a3b1ad1..c1f1a847d2 100644
--- a/modules/http2/h2_filter.c
+++ b/modules/http2/h2_filter.c
@@ -428,38 +428,41 @@ static void add_stats(apr_bucket_brigade *bb, h2_session *s,
static apr_status_t h2_status_insert(h2_task *task, apr_bucket *b)
{
- h2_mplx *m = task->mplx;
- h2_stream *stream = h2_mplx_stream_get(m, task->stream_id);
- h2_session *s;
- conn_rec *c;
-
+ conn_rec *c = task->c->master;
+ h2_ctx *h2ctx = h2_ctx_get(c, 0);
+ h2_session *session;
+ h2_stream *stream;
apr_bucket_brigade *bb;
apr_bucket *e;
int32_t connFlowIn, connFlowOut;
+
+ if (!h2ctx || (session = h2_ctx_session_get(h2ctx)) == NULL) {
+ return APR_SUCCESS;
+ }
+
+ stream = h2_session_stream_get(session, task->stream_id);
if (!stream) {
/* stream already done */
return APR_SUCCESS;
}
- s = stream->session;
- c = s->c;
bb = apr_brigade_create(stream->pool, c->bucket_alloc);
- connFlowIn = nghttp2_session_get_effective_local_window_size(s->ngh2);
- connFlowOut = nghttp2_session_get_remote_window_size(s->ngh2);
+ connFlowIn = nghttp2_session_get_effective_local_window_size(session->ngh2);
+ connFlowOut = nghttp2_session_get_remote_window_size(session->ngh2);
bbout(bb, "{\n");
bbout(bb, " \"version\": \"draft-01\",\n");
- add_settings(bb, s, 0);
- add_peer_settings(bb, s, 0);
+ add_settings(bb, session, 0);
+ add_peer_settings(bb, session, 0);
bbout(bb, " \"connFlowIn\": %d,\n", connFlowIn);
bbout(bb, " \"connFlowOut\": %d,\n", connFlowOut);
- bbout(bb, " \"sentGoAway\": %d,\n", s->local.shutdown);
+ bbout(bb, " \"sentGoAway\": %d,\n", session->local.shutdown);
- add_streams(bb, s, 0);
+ add_streams(bb, session, 0);
- add_stats(bb, s, stream, 1);
+ add_stats(bb, session, stream, 1);
bbout(bb, "}\n");
while ((e = APR_BRIGADE_FIRST(bb)) != APR_BRIGADE_SENTINEL(bb)) {
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c
index 04fbbd05cb..357bf5eaad 100644
--- a/modules/http2/h2_mplx.c
+++ b/modules/http2/h2_mplx.c
@@ -55,58 +55,29 @@ typedef struct {
apr_time_t now;
} stream_iter_ctx;
-/* NULL or the mutex hold by this thread, used for recursive calls
- */
-static const int nested_lock = 0;
-
-static apr_threadkey_t *thread_lock;
-
apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
{
- if (nested_lock) {
- return apr_threadkey_private_create(&thread_lock, NULL, pool);
- }
return APR_SUCCESS;
}
-static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
-{
- apr_status_t status;
-
- if (nested_lock) {
- void *mutex = NULL;
- /* Enter the mutex if this thread already holds the lock or
- * if we can acquire it. Only on the later case do we unlock
- * onleaving the mutex.
- * This allow recursive entering of the mutex from the saem thread,
- * which is what we need in certain situations involving callbacks
- */
- apr_threadkey_private_get(&mutex, thread_lock);
- if (mutex == m->lock) {
- *pacquired = 0;
- ap_assert(NULL); /* nested, why? */
- return APR_SUCCESS;
- }
- }
- status = apr_thread_mutex_lock(m->lock);
- *pacquired = (status == APR_SUCCESS);
- if (nested_lock && *pacquired) {
- apr_threadkey_private_set(m->lock, thread_lock);
- }
- return status;
-}
+#define H2_MPLX_ENTER(m) \
+ do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
+ return rv;\
+ } } while(0)
-static void leave_mutex(h2_mplx *m, int acquired)
-{
- if (acquired) {
- if (nested_lock) {
- apr_threadkey_private_set(NULL, thread_lock);
- }
- apr_thread_mutex_unlock(m->lock);
- }
-}
+#define H2_MPLX_LEAVE(m) \
+ apr_thread_mutex_unlock(m->lock)
+
+#define H2_MPLX_ENTER_ALWAYS(m) \
+ apr_thread_mutex_lock(m->lock)
-static void check_data_for(h2_mplx *m, int stream_id);
+#define H2_MPLX_ENTER_MAYBE(m, lock) \
+ if (lock) apr_thread_mutex_lock(m->lock)
+
+#define H2_MPLX_LEAVE_MAYBE(m, lock) \
+ if (lock) apr_thread_mutex_unlock(m->lock)
+
+static void check_data_for(h2_mplx *m, h2_stream *stream, int lock);
static void stream_output_consumed(void *ctx,
h2_bucket_beam *beam, apr_off_t length)
@@ -155,6 +126,7 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream)
h2_stream_cleanup(stream);
h2_iq_remove(m->q, stream->id);
+ h2_fifo_remove(m->readyq, stream);
h2_ihash_remove(m->streams, stream->id);
h2_ihash_add(m->shold, stream);
@@ -240,7 +212,12 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->q = h2_iq_create(m->pool, m->max_streams);
- m->readyq = h2_iq_create(m->pool, m->max_streams);
+
+ status = h2_fifo_set_create(&m->readyq, m->pool, m->max_streams);
+ if (status != APR_SUCCESS) {
+ apr_pool_destroy(m->pool);
+ return NULL;
+ }
m->workers = workers;
m->max_active = workers->max_workers;
@@ -259,14 +236,15 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
int h2_mplx_shutdown(h2_mplx *m)
{
- int acquired, max_stream_started = 0;
+ int max_stream_started = 0;
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- max_stream_started = m->max_stream_started;
- /* Clear schedule queue, disabling existing streams from starting */
- h2_iq_clear(m->q);
- leave_mutex(m, acquired);
- }
+ H2_MPLX_ENTER(m);
+
+ max_stream_started = m->max_stream_started;
+ /* Clear schedule queue, disabling existing streams from starting */
+ h2_iq_clear(m->q);
+
+ H2_MPLX_LEAVE(m);
return max_stream_started;
}
@@ -341,12 +319,14 @@ static int stream_destroy_iter(void *ctx, void *val)
return 0;
}
-static void purge_streams(h2_mplx *m)
+static void purge_streams(h2_mplx *m, int lock)
{
if (!h2_ihash_empty(m->spurge)) {
+ H2_MPLX_ENTER_MAYBE(m, lock);
while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) {
/* repeat until empty */
}
+ H2_MPLX_LEAVE_MAYBE(m, lock);
}
}
@@ -363,18 +343,16 @@ static int stream_iter_wrap(void *ctx, void *stream)
apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
{
- apr_status_t status;
- int acquired;
+ stream_iter_ctx_t x;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- stream_iter_ctx_t x;
- x.cb = cb;
- x.ctx = ctx;
- h2_ihash_iter(m->streams, stream_iter_wrap, &x);
+ H2_MPLX_ENTER(m);
+
+ x.cb = cb;
+ x.ctx = ctx;
+ h2_ihash_iter(m->streams, stream_iter_wrap, &x);
- leave_mutex(m, acquired);
- }
- return status;
+ H2_MPLX_LEAVE(m);
+ return APR_SUCCESS;
}
static int report_stream_iter(void *ctx, void *val) {
@@ -430,14 +408,13 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
{
apr_status_t status;
int i, wait_secs = 60;
- int acquired;
/* How to shut down a h2 connection:
* 0. abort and tell the workers that no more tasks will come from us */
m->aborted = 1;
h2_workers_unregister(m->workers, m);
- enter_mutex(m, &acquired);
+ H2_MPLX_ENTER_ALWAYS(m);
/* How to shut down a h2 connection:
* 1. cancel all streams still active */
@@ -482,7 +459,7 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
h2_ihash_iter(m->shold, unexpected_stream_iter, m);
}
- leave_mutex(m, acquired);
+ H2_MPLX_LEAVE(m);
/* 5. unregister again, now that our workers are done */
h2_workers_unregister(m->workers, m);
@@ -493,41 +470,34 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream)
{
- apr_status_t status = APR_SUCCESS;
- int acquired;
+ H2_MPLX_ENTER(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- H2_STRM_MSG(stream, "cleanup"));
- stream_cleanup(m, stream);
- leave_mutex(m, acquired);
- }
- return status;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ H2_STRM_MSG(stream, "cleanup"));
+ stream_cleanup(m, stream);
+
+ H2_MPLX_LEAVE(m);
+ return APR_SUCCESS;
}
h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)
{
h2_stream *s = NULL;
- int acquired;
- if ((enter_mutex(m, &acquired)) == APR_SUCCESS) {
- s = h2_ihash_get(m->streams, id);
- leave_mutex(m, acquired);
- }
+ H2_MPLX_ENTER_ALWAYS(m);
+
+ s = h2_ihash_get(m->streams, id);
+
+ H2_MPLX_LEAVE(m);
return s;
}
static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
{
- h2_mplx *m = ctx;
- int acquired;
+ h2_stream *stream = ctx;
+ h2_mplx *m = stream->session->mplx;
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id);
- check_data_for(m, beam->id);
- leave_mutex(m, acquired);
- }
+ check_data_for(m, stream, 1);
}
static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
@@ -551,7 +521,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
}
h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream);
- h2_beam_on_produced(stream->output, output_produced, m);
+ h2_beam_on_produced(stream->output, output_produced, stream);
if (stream->task->output.copy_files) {
h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL);
}
@@ -561,24 +531,24 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
/* we might see some file buckets in the output, see
* if we have enough handles reserved. */
- check_data_for(m, stream->id);
+ check_data_for(m, stream, 0);
return status;
}
apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
{
apr_status_t status;
- int acquired;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (m->aborted) {
- status = APR_ECONNABORTED;
- }
- else {
- status = out_open(m, stream_id, beam);
- }
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER(m);
+
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
}
+ else {
+ status = out_open(m, stream_id, beam);
+ }
+
+ H2_MPLX_LEAVE(m);
return status;
}
@@ -601,7 +571,7 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task)
status = h2_beam_close(task->output.beam);
h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close");
output_consumed_signal(m, task);
- check_data_for(m, task->stream_id);
+ check_data_for(m, stream, 0);
return status;
}
@@ -609,58 +579,61 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
apr_thread_cond_t *iowait)
{
apr_status_t status;
- int acquired;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (m->aborted) {
- status = APR_ECONNABORTED;
- }
- else if (apr_atomic_read32(&m->event_pending) > 0) {
- status = APR_SUCCESS;
- }
- else {
- purge_streams(m);
- h2_ihash_iter(m->streams, report_consumption_iter, m);
- m->added_output = iowait;
- status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
- if (APLOGctrace2(m->c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): trywait on data for %f ms)",
- m->id, timeout/1000.0);
- }
- m->added_output = NULL;
+ H2_MPLX_ENTER(m);
+
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
+ }
+ else if (h2_mplx_has_master_events(m)) {
+ status = APR_SUCCESS;
+ }
+ else {
+ purge_streams(m, 0);
+ h2_ihash_iter(m->streams, report_consumption_iter, m);
+ m->added_output = iowait;
+ status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
+ if (APLOGctrace2(m->c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): trywait on data for %f ms)",
+ m->id, timeout/1000.0);
}
- leave_mutex(m, acquired);
+ m->added_output = NULL;
}
+
+ H2_MPLX_LEAVE(m);
return status;
}
-static void check_data_for(h2_mplx *m, int stream_id)
+static void check_data_for(h2_mplx *m, h2_stream *stream, int lock)
{
- ap_assert(m);
- h2_iq_append(m->readyq, stream_id);
- apr_atomic_set32(&m->event_pending, 1);
- if (m->added_output) {
- apr_thread_cond_signal(m->added_output);
+ if (h2_fifo_push(m->readyq, stream) == APR_SUCCESS) {
+ apr_atomic_set32(&m->event_pending, 1);
+ H2_MPLX_ENTER_MAYBE(m, lock);
+ if (m->added_output) {
+ apr_thread_cond_signal(m->added_output);
+ }
+ H2_MPLX_LEAVE_MAYBE(m, lock);
}
}
apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
- int acquired;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (m->aborted) {
- status = APR_ECONNABORTED;
- }
- else {
- h2_iq_sort(m->q, cmp, ctx);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): reprioritize tasks", m->id);
- }
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER(m);
+
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
+ }
+ else {
+ h2_iq_sort(m->q, cmp, ctx);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): reprioritize tasks", m->id);
+ status = APR_SUCCESS;
}
+
+ H2_MPLX_LEAVE(m);
return status;
}
@@ -682,29 +655,30 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
- int acquired;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (m->aborted) {
- status = APR_ECONNABORTED;
+ H2_MPLX_ENTER(m);
+
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
+ }
+ else {
+ status = APR_SUCCESS;
+ h2_ihash_add(m->streams, stream);
+ if (h2_stream_is_ready(stream)) {
+ /* already have a response */
+ check_data_for(m, stream, 0);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ H2_STRM_MSG(stream, "process, add to readyq"));
}
else {
- h2_ihash_add(m->streams, stream);
- if (h2_stream_is_ready(stream)) {
- /* already have a response */
- check_data_for(m, stream->id);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- H2_STRM_MSG(stream, "process, add to readyq"));
- }
- else {
- h2_iq_add(m->q, stream->id, cmp, ctx);
- register_if_needed(m);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- H2_STRM_MSG(stream, "process, added to q"));
- }
+ h2_iq_add(m->q, stream->id, cmp, ctx);
+ register_if_needed(m);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ H2_STRM_MSG(stream, "process, added to q"));
}
- leave_mutex(m, acquired);
}
+
+ H2_MPLX_LEAVE(m);
return status;
}
@@ -762,22 +736,21 @@ static h2_task *next_stream_task(h2_mplx *m)
h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
{
h2_task *task = NULL;
- apr_status_t status;
- int acquired;
+ H2_MPLX_ENTER_ALWAYS(m);
+
*has_more = 0;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (!m->aborted) {
- task = next_stream_task(m);
- if (task != NULL && !h2_iq_empty(m->q)) {
- *has_more = 1;
- }
- else {
- m->is_registered = 0; /* h2_workers will discard this mplx */
- }
+ if (!m->aborted) {
+ task = next_stream_task(m);
+ if (task != NULL && !h2_iq_empty(m->q)) {
+ *has_more = 1;
+ }
+ else {
+ m->is_registered = 0; /* h2_workers will discard this mplx */
}
- leave_mutex(m, acquired);
}
+
+ H2_MPLX_LEAVE(m);
return task;
}
@@ -814,7 +787,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
if (task->engine) {
if (!m->aborted && !task->c->aborted
&& !h2_req_engine_is_shutdown(task->engine)) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(10022)
"h2_mplx(%ld): task(%s) has not-shutdown "
"engine(%s)", m->id, task->id,
h2_req_engine_get_id(task->engine));
@@ -845,35 +818,37 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
}
stream = h2_ihash_get(m->streams, task->stream_id);
- if (stream && !m->aborted && h2_ihash_get(m->sredo, stream->id)) {
- /* reset and schedule again */
- h2_task_redo(task);
- h2_ihash_remove(m->sredo, stream->id);
- h2_iq_add(m->q, stream->id, NULL, NULL);
- return;
- }
-
if (stream) {
- /* stream not cleaned up, stay around */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- H2_STRM_MSG(stream, "task_done, stream open"));
- /* more data will not arrive, resume the stream */
- if (stream->input) {
- h2_beam_mutex_disable(stream->input);
- h2_beam_leave(stream->input);
+ /* stream not done yet. */
+ if (!m->aborted && h2_ihash_get(m->sredo, stream->id)) {
+ /* reset and schedule again */
+ h2_task_redo(task);
+ h2_ihash_remove(m->sredo, stream->id);
+ h2_iq_add(m->q, stream->id, NULL, NULL);
}
- if (stream->output) {
- h2_beam_mutex_disable(stream->output);
+ else {
+ /* stream not cleaned up, stay around */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ H2_STRM_MSG(stream, "task_done, stream open"));
+ /* more data will not arrive, resume the stream */
+ check_data_for(m, stream, 0);
+
+ if (stream->input) {
+ h2_beam_leave(stream->input);
+ h2_beam_mutex_disable(stream->input);
+ }
+ if (stream->output) {
+ h2_beam_mutex_disable(stream->output);
+ }
}
- check_data_for(m, stream->id);
}
else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
+ /* stream is done, was just waiting for this. */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
H2_STRM_MSG(stream, "task_done, in hold"));
- /* stream was just waiting for us. */
if (stream->input) {
- h2_beam_mutex_disable(stream->input);
h2_beam_leave(stream->input);
+ h2_beam_mutex_disable(stream->input);
}
if (stream->output) {
h2_beam_mutex_disable(stream->output);
@@ -895,21 +870,21 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
{
- int acquired;
+ H2_MPLX_ENTER_ALWAYS(m);
+
+ task_done(m, task, NULL);
+ --m->tasks_active;
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- task_done(m, task, NULL);
- --m->tasks_active;
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
- if (ptask) {
- /* caller wants another task */
- *ptask = next_stream_task(m);
- }
- register_if_needed(m);
- leave_mutex(m, acquired);
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
+ }
+ if (ptask) {
+ /* caller wants another task */
+ *ptask = next_stream_task(m);
}
+ register_if_needed(m);
+
+ H2_MPLX_LEAVE(m);
}
/*******************************************************************************
@@ -1001,52 +976,53 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
{
apr_status_t status = APR_SUCCESS;
apr_time_t now;
- int acquired;
+ apr_size_t scount;
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- apr_size_t scount = h2_ihash_count(m->streams);
- if (scount > 0 && m->tasks_active) {
- /* If we have streams in connection state 'IDLE', meaning
- * all streams are ready to sent data out, but lack
- * WINDOW_UPDATEs.
- *
- * This is ok, unless we have streams that still occupy
- * h2 workers. As worker threads are a scarce resource,
- * we need to take measures that we do not get DoSed.
- *
- * This is what we call an 'idle block'. Limit the amount
- * of busy workers we allow for this connection until it
- * well behaves.
- */
- now = apr_time_now();
- m->last_idle_block = now;
- if (m->limit_active > 2
- && now - m->last_limit_change >= m->limit_change_interval) {
- if (m->limit_active > 16) {
- m->limit_active = 16;
- }
- else if (m->limit_active > 8) {
- m->limit_active = 8;
- }
- else if (m->limit_active > 4) {
- m->limit_active = 4;
- }
- else if (m->limit_active > 2) {
- m->limit_active = 2;
- }
- m->last_limit_change = now;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): decrease worker limit to %d",
- m->id, m->limit_active);
+ H2_MPLX_ENTER(m);
+
+ scount = h2_ihash_count(m->streams);
+ if (scount > 0 && m->tasks_active) {
+ /* If we have streams in connection state 'IDLE', meaning
+ * all streams are ready to sent data out, but lack
+ * WINDOW_UPDATEs.
+ *
+ * This is ok, unless we have streams that still occupy
+ * h2 workers. As worker threads are a scarce resource,
+ * we need to take measures that we do not get DoSed.
+ *
+ * This is what we call an 'idle block'. Limit the amount
+ * of busy workers we allow for this connection until it
+ * well behaves.
+ */
+ now = apr_time_now();
+ m->last_idle_block = now;
+ if (m->limit_active > 2
+ && now - m->last_limit_change >= m->limit_change_interval) {
+ if (m->limit_active > 16) {
+ m->limit_active = 16;
}
-
- if (m->tasks_active > m->limit_active) {
- status = unschedule_slow_tasks(m);
+ else if (m->limit_active > 8) {
+ m->limit_active = 8;
+ }
+ else if (m->limit_active > 4) {
+ m->limit_active = 4;
+ }
+ else if (m->limit_active > 2) {
+ m->limit_active = 2;
}
+ m->last_limit_change = now;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): decrease worker limit to %d",
+ m->id, m->limit_active);
+ }
+
+ if (m->tasks_active > m->limit_active) {
+ status = unschedule_slow_tasks(m);
}
- register_if_needed(m);
- leave_mutex(m, acquired);
}
+ register_if_needed(m);
+
+ H2_MPLX_LEAVE(m);
return status;
}
@@ -1090,7 +1066,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
apr_status_t status;
h2_mplx *m;
h2_task *task;
- int acquired;
+ h2_stream *stream;
task = h2_ctx_rget_task(r);
if (!task) {
@@ -1098,17 +1074,17 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
}
m = task->mplx;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
-
- if (stream) {
- status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
- }
- else {
- status = APR_ECONNABORTED;
- }
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER(m);
+
+ stream = h2_ihash_get(m->streams, task->stream_id);
+ if (stream) {
+ status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
+ }
+ else {
+ status = APR_ECONNABORTED;
}
+
+ H2_MPLX_LEAVE(m);
return status;
}
@@ -1120,35 +1096,36 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
h2_mplx *m = h2_ngn_shed_get_ctx(shed);
apr_status_t status;
- int acquired;
+ int want_shutdown;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- int want_shutdown = (block == APR_BLOCK_READ);
+ H2_MPLX_ENTER(m);
- /* Take this opportunity to update output consummation
- * for this engine */
- ngn_out_update_windows(m, ngn);
-
- if (want_shutdown && !h2_iq_empty(m->q)) {
- /* For a blocking read, check first if requests are to be
- * had and, if not, wait a short while before doing the
- * blocking, and if unsuccessful, terminating read.
- */
+ want_shutdown = (block == APR_BLOCK_READ);
+
+ /* Take this opportunity to update output consummation
+ * for this engine */
+ ngn_out_update_windows(m, ngn);
+
+ if (want_shutdown && !h2_iq_empty(m->q)) {
+ /* For a blocking read, check first if requests are to be
+ * had and, if not, wait a short while before doing the
+ * blocking, and if unsuccessful, terminating read.
+ */
+ status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
+ if (APR_STATUS_IS_EAGAIN(status)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): start block engine pull", m->id);
+ apr_thread_cond_timedwait(m->task_thawed, m->lock,
+ apr_time_from_msec(20));
status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
- if (APR_STATUS_IS_EAGAIN(status)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): start block engine pull", m->id);
- apr_thread_cond_timedwait(m->task_thawed, m->lock,
- apr_time_from_msec(20));
- status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
- }
- }
- else {
- status = h2_ngn_shed_pull_request(shed, ngn, capacity,
- want_shutdown, pr);
}
- leave_mutex(m, acquired);
}
+ else {
+ status = h2_ngn_shed_pull_request(shed, ngn, capacity,
+ want_shutdown, pr);
+ }
+
+ H2_MPLX_LEAVE(m);
return status;
}
@@ -1159,29 +1136,29 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
if (task) {
h2_mplx *m = task->mplx;
- int acquired;
+ h2_stream *stream;
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
-
- ngn_out_update_windows(m, ngn);
- h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
-
- if (status != APR_SUCCESS && stream
- && h2_task_can_redo(task)
- && !h2_ihash_get(m->sredo, stream->id)) {
- h2_ihash_add(m->sredo, stream);
- }
- if (task->engine) {
- /* cannot report that as done until engine returns */
- }
- else {
- task_done(m, task, ngn);
- }
- /* Take this opportunity to update output consummation
- * for this engine */
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER_ALWAYS(m);
+
+ stream = h2_ihash_get(m->streams, task->stream_id);
+
+ ngn_out_update_windows(m, ngn);
+ h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
+
+ if (status != APR_SUCCESS && stream
+ && h2_task_can_redo(task)
+ && !h2_ihash_get(m->sredo, stream->id)) {
+ h2_ihash_add(m->sredo, stream);
+ }
+
+ if (task->engine) {
+ /* cannot report that as done until engine returns */
}
+ else {
+ task_done(m, task, ngn);
+ }
+
+ H2_MPLX_LEAVE(m);
}
}
@@ -1198,65 +1175,47 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
stream_ev_callback *on_resume,
void *on_ctx)
{
- apr_status_t status;
- int acquired;
- int ids[100];
h2_stream *stream;
- size_t i, n;
+ int n;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): dispatch events", m->id);
- apr_atomic_set32(&m->event_pending, 0);
- purge_streams(m);
-
- /* update input windows for streams */
- h2_ihash_iter(m->streams, report_consumption_iter, m);
-
- if (!h2_iq_empty(m->readyq)) {
- n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
- for (i = 0; i < n; ++i) {
- stream = h2_ihash_get(m->streams, ids[i]);
- if (stream) {
- leave_mutex(m, acquired);
- on_resume(on_ctx, stream);
- enter_mutex(m, &acquired);
- }
- }
- }
- if (!h2_iq_empty(m->readyq)) {
- apr_atomic_set32(&m->event_pending, 1);
- }
- leave_mutex(m, acquired);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): dispatch events", m->id);
+ apr_atomic_set32(&m->event_pending, 0);
+
+ /* update input windows for streams */
+ h2_ihash_iter(m->streams, report_consumption_iter, m);
+ purge_streams(m, 1);
+
+ n = h2_fifo_count(m->readyq);
+ while (n > 0
+ && (h2_fifo_try_pull(m->readyq, (void**)&stream) == APR_SUCCESS)) {
+ --n;
+ on_resume(on_ctx, stream);
}
- return status;
+
+ return APR_SUCCESS;
}
-apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id)
+apr_status_t h2_mplx_keep_active(h2_mplx *m, h2_stream *stream)
{
- apr_status_t status;
- int acquired;
-
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- check_data_for(m, stream_id);
- leave_mutex(m, acquired);
- }
- return status;
+ check_data_for(m, stream, 1);
+ return APR_SUCCESS;
}
int h2_mplx_awaits_data(h2_mplx *m)
{
- apr_status_t status;
- int acquired, waiting = 1;
+ int waiting = 1;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (h2_ihash_empty(m->streams)) {
- waiting = 0;
- }
- if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) {
- waiting = 0;
- }
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER_ALWAYS(m);
+
+ if (h2_ihash_empty(m->streams)) {
+ waiting = 0;
}
+ if ((h2_fifo_count(m->readyq) == 0)
+ && h2_iq_empty(m->q) && !m->tasks_active) {
+ waiting = 0;
+ }
+
+ H2_MPLX_LEAVE(m);
return waiting;
}
diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h
index 82a98fce0a..ed332c8bc3 100644
--- a/modules/http2/h2_mplx.h
+++ b/modules/http2/h2_mplx.h
@@ -68,7 +68,7 @@ struct h2_mplx {
struct h2_ihash_t *spurge; /* all streams done, ready for destroy */
struct h2_iqueue *q; /* all stream ids that need to be started */
- struct h2_iqueue *readyq; /* all stream ids ready for output */
+ struct h2_fifo *readyq; /* all streams ready for output */
struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */
@@ -158,7 +158,7 @@ apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, struct h2_stream *stream);
apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
struct apr_thread_cond_t *iowait);
-apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id);
+apr_status_t h2_mplx_keep_active(h2_mplx *m, struct h2_stream *stream);
/*******************************************************************************
* Stream processing.
diff --git a/modules/http2/h2_ngn_shed.c b/modules/http2/h2_ngn_shed.c
index e0c40cfb23..27474ba22d 100644
--- a/modules/http2/h2_ngn_shed.c
+++ b/modules/http2/h2_ngn_shed.c
@@ -151,6 +151,7 @@ static void ngn_add_task(h2_req_engine *ngn, h2_task *task, request_rec *r)
entry->task = task;
entry->r = r;
H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
+ ngn->no_assigned++;
}
@@ -176,6 +177,17 @@ apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type,
task->assigned = NULL;
}
+ if (task->engine) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ "h2_ngn_shed(%ld): push task(%s) hosting engine %s "
+ "already with %d tasks",
+ shed->c->id, task->id, task->engine->id,
+ task->engine->no_assigned);
+ task->assigned = task->engine;
+ ngn_add_task(task->engine, task, r);
+ return APR_SUCCESS;
+ }
+
ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING);
if (ngn && !ngn->shutdown) {
/* this task will be processed in another thread,
@@ -187,7 +199,6 @@ apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type,
h2_task_freeze(task);
}
ngn_add_task(ngn, task, r);
- ngn->no_assigned++;
return APR_SUCCESS;
}
@@ -211,11 +222,11 @@ apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type,
status = einit(newngn, newngn->id, newngn->type, newngn->pool,
shed->req_buffer_size, r,
&newngn->out_consumed, &newngn->out_consumed_ctx);
+
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03395)
"h2_ngn_shed(%ld): create engine %s (%s)",
shed->c->id, newngn->id, newngn->type);
if (status == APR_SUCCESS) {
- ap_assert(task->engine == NULL);
newngn->task = task;
task->engine = newngn;
task->assigned = newngn;
diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c
index 49476e965b..f2fed906b7 100644
--- a/modules/http2/h2_proxy_session.c
+++ b/modules/http2/h2_proxy_session.c
@@ -242,7 +242,6 @@ static int add_header(void *table, const char *n, const char *v)
static void process_proxy_header(h2_proxy_stream *stream, const char *n, const char *v)
{
- request_rec *r = stream->r;
static const struct {
const char *name;
ap_proxy_header_reverse_map_fn func;
@@ -254,23 +253,26 @@ static void process_proxy_header(h2_proxy_stream *stream, const char *n, const c
{ "Set-Cookie", ap_proxy_cookie_reverse_map },
{ NULL, NULL }
};
+ request_rec *r = stream->r;
proxy_dir_conf *dconf;
int i;
- for (i = 0; transform_hdrs[i].name; ++i) {
- if (!ap_cstr_casecmp(transform_hdrs[i].name, n)) {
+ dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
+ if (!dconf->preserve_host) {
+ for (i = 0; transform_hdrs[i].name; ++i) {
+ if (!ap_cstr_casecmp(transform_hdrs[i].name, n)) {
+ apr_table_add(r->headers_out, n,
+ (*transform_hdrs[i].func)(r, dconf, v));
+ return;
+ }
+ }
+ if (!ap_cstr_casecmp("Link", n)) {
dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
apr_table_add(r->headers_out, n,
- (*transform_hdrs[i].func)(r, dconf, v));
+ h2_proxy_link_reverse_map(r, dconf,
+ stream->real_server_uri, stream->p_server_uri, v));
return;
- }
- }
- if (!ap_cstr_casecmp("Link", n)) {
- dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
- apr_table_add(r->headers_out, n,
- h2_proxy_link_reverse_map(r, dconf,
- stream->real_server_uri, stream->p_server_uri, v));
- return;
+ }
}
apr_table_add(r->headers_out, n, v);
}
diff --git a/modules/http2/h2_proxy_util.c b/modules/http2/h2_proxy_util.c
index b92a876f42..206020fd87 100644
--- a/modules/http2/h2_proxy_util.c
+++ b/modules/http2/h2_proxy_util.c
@@ -16,6 +16,8 @@
#include <assert.h>
#include <apr_lib.h>
#include <apr_strings.h>
+#include <apr_thread_mutex.h>
+#include <apr_thread_cond.h>
#include <httpd.h>
#include <http_core.h>
@@ -1053,3 +1055,282 @@ const char *h2_proxy_link_reverse_map(request_rec *r,
"link_reverse_map %s --> %s", s, ctx.s);
return ctx.s;
}
+
+/*******************************************************************************
+ * FIFO queue
+ ******************************************************************************/
+
+struct h2_proxy_fifo {
+ void **elems;
+ int nelems;
+ int set;
+ int head;
+ int count;
+ int aborted;
+ apr_thread_mutex_t *lock;
+ apr_thread_cond_t *not_empty;
+ apr_thread_cond_t *not_full;
+};
+
+static int nth_index(h2_proxy_fifo *fifo, int n)
+{
+ return (fifo->head + n) % fifo->nelems;
+}
+
+static apr_status_t fifo_destroy(void *data)
+{
+ h2_proxy_fifo *fifo = data;
+
+ apr_thread_cond_destroy(fifo->not_empty);
+ apr_thread_cond_destroy(fifo->not_full);
+ apr_thread_mutex_destroy(fifo->lock);
+
+ return APR_SUCCESS;
+}
+
+static int index_of(h2_proxy_fifo *fifo, void *elem)
+{
+ int i;
+
+ for (i = 0; i < fifo->count; ++i) {
+ if (elem == fifo->elems[nth_index(fifo, i)]) {
+ return i;
+ }
+ }
+ return -1;
+}
+
+static apr_status_t create_int(h2_proxy_fifo **pfifo, apr_pool_t *pool,
+ int capacity, int as_set)
+{
+ apr_status_t rv;
+ h2_proxy_fifo *fifo;
+
+ fifo = apr_pcalloc(pool, sizeof(*fifo));
+ if (fifo == NULL) {
+ return APR_ENOMEM;
+ }
+
+ rv = apr_thread_mutex_create(&fifo->lock,
+ APR_THREAD_MUTEX_UNNESTED, pool);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ rv = apr_thread_cond_create(&fifo->not_empty, pool);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ rv = apr_thread_cond_create(&fifo->not_full, pool);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ fifo->elems = apr_pcalloc(pool, capacity * sizeof(void*));
+ if (fifo->elems == NULL) {
+ return APR_ENOMEM;
+ }
+ fifo->nelems = capacity;
+ fifo->set = as_set;
+
+ *pfifo = fifo;
+ apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null);
+
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_proxy_fifo_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+ return create_int(pfifo, pool, capacity, 0);
+}
+
+apr_status_t h2_proxy_fifo_set_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+ return create_int(pfifo, pool, capacity, 1);
+}
+
+apr_status_t h2_proxy_fifo_term(h2_proxy_fifo *fifo)
+{
+ apr_status_t rv;
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ fifo->aborted = 1;
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_proxy_fifo_interrupt(h2_proxy_fifo *fifo)
+{
+ apr_status_t rv;
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ apr_thread_cond_broadcast(fifo->not_empty);
+ apr_thread_cond_broadcast(fifo->not_full);
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+int h2_proxy_fifo_count(h2_proxy_fifo *fifo)
+{
+ return fifo->count;
+}
+
+int h2_proxy_fifo_capacity(h2_proxy_fifo *fifo)
+{
+ return fifo->nelems;
+}
+
+static apr_status_t check_not_empty(h2_proxy_fifo *fifo, int block)
+{
+ if (fifo->count == 0) {
+ if (!block) {
+ return APR_EAGAIN;
+ }
+ while (fifo->count == 0) {
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+ apr_thread_cond_wait(fifo->not_empty, fifo->lock);
+ }
+ }
+ return APR_SUCCESS;
+}
+
+static apr_status_t fifo_push(h2_proxy_fifo *fifo, void *elem, int block)
+{
+ apr_status_t rv;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ if (fifo->set && index_of(fifo, elem) >= 0) {
+ /* set mode, elem already member */
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EEXIST;
+ }
+ else if (fifo->count == fifo->nelems) {
+ if (block) {
+ while (fifo->count == fifo->nelems) {
+ if (fifo->aborted) {
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EOF;
+ }
+ apr_thread_cond_wait(fifo->not_full, fifo->lock);
+ }
+ }
+ else {
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EAGAIN;
+ }
+ }
+
+ ap_assert(fifo->count < fifo->nelems);
+ fifo->elems[nth_index(fifo, fifo->count)] = elem;
+ ++fifo->count;
+ if (fifo->count == 1) {
+ apr_thread_cond_broadcast(fifo->not_empty);
+ }
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_proxy_fifo_push(h2_proxy_fifo *fifo, void *elem)
+{
+ return fifo_push(fifo, elem, 1);
+}
+
+apr_status_t h2_proxy_fifo_try_push(h2_proxy_fifo *fifo, void *elem)
+{
+ return fifo_push(fifo, elem, 0);
+}
+
+static void *pull_head(h2_proxy_fifo *fifo)
+{
+ void *elem;
+
+ ap_assert(fifo->count > 0);
+ elem = fifo->elems[fifo->head];
+ --fifo->count;
+ if (fifo->count > 0) {
+ fifo->head = nth_index(fifo, 1);
+ if (fifo->count+1 == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ }
+ return elem;
+}
+
+static apr_status_t fifo_pull(h2_proxy_fifo *fifo, void **pelem, int block)
+{
+ apr_status_t rv;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ if ((rv = check_not_empty(fifo, block)) != APR_SUCCESS) {
+ apr_thread_mutex_unlock(fifo->lock);
+ *pelem = NULL;
+ return rv;
+ }
+
+ ap_assert(fifo->count > 0);
+ *pelem = pull_head(fifo);
+
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_proxy_fifo_pull(h2_proxy_fifo *fifo, void **pelem)
+{
+ return fifo_pull(fifo, pelem, 1);
+}
+
+apr_status_t h2_proxy_fifo_try_pull(h2_proxy_fifo *fifo, void **pelem)
+{
+ return fifo_pull(fifo, pelem, 0);
+}
+
+apr_status_t h2_proxy_fifo_remove(h2_proxy_fifo *fifo, void *elem)
+{
+ apr_status_t rv;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ int i, rc;
+ void *e;
+
+ rc = 0;
+ for (i = 0; i < fifo->count; ++i) {
+ e = fifo->elems[nth_index(fifo, i)];
+ if (e == elem) {
+ ++rc;
+ }
+ else if (rc) {
+ fifo->elems[nth_index(fifo, i-rc)] = e;
+ }
+ }
+ if (rc) {
+ fifo->count -= rc;
+ if (fifo->count + rc == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ rv = APR_SUCCESS;
+ }
+ else {
+ rv = APR_EAGAIN;
+ }
+
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
diff --git a/modules/http2/h2_proxy_util.h b/modules/http2/h2_proxy_util.h
index f90d14951b..ea44184256 100644
--- a/modules/http2/h2_proxy_util.h
+++ b/modules/http2/h2_proxy_util.h
@@ -201,4 +201,55 @@ const char *h2_proxy_link_reverse_map(request_rec *r,
const char *proxy_server_uri,
const char *s);
+/*******************************************************************************
+ * FIFO queue
+ ******************************************************************************/
+
+/**
+ * A thread-safe FIFO queue with some extra bells and whistles, if you
+ * do not need anything special, better use 'apr_queue'.
+ */
+typedef struct h2_proxy_fifo h2_proxy_fifo;
+
+/**
+ * Create a FIFO queue that can hold up to capacity elements. Elements can
+ * appear several times.
+ */
+apr_status_t h2_proxy_fifo_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity);
+
+/**
+ * Create a FIFO set that can hold up to capacity elements. Elements only
+ * appear once. Pushing an element already present does not change the
+ * queue and is successful.
+ */
+apr_status_t h2_proxy_fifo_set_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity);
+
+apr_status_t h2_proxy_fifo_term(h2_proxy_fifo *fifo);
+apr_status_t h2_proxy_fifo_interrupt(h2_proxy_fifo *fifo);
+
+int h2_proxy_fifo_capacity(h2_proxy_fifo *fifo);
+int h2_proxy_fifo_count(h2_proxy_fifo *fifo);
+
+/**
+ * Push en element into the queue. Blocks if there is no capacity left.
+ *
+ * @param fifo the FIFO queue
+ * @param elem the element to push
+ * @return APR_SUCCESS on push, APR_EAGAIN on try_push on a full queue,
+ * APR_EEXIST when in set mode and elem already there.
+ */
+apr_status_t h2_proxy_fifo_push(h2_proxy_fifo *fifo, void *elem);
+apr_status_t h2_proxy_fifo_try_push(h2_proxy_fifo *fifo, void *elem);
+
+apr_status_t h2_proxy_fifo_pull(h2_proxy_fifo *fifo, void **pelem);
+apr_status_t h2_proxy_fifo_try_pull(h2_proxy_fifo *fifo, void **pelem);
+
+/**
+ * Remove the elem from the queue, will remove multiple appearances.
+ * @param elem the element to remove
+ * @return APR_SUCCESS iff > 0 elems were removed, APR_EAGAIN otherwise.
+ */
+apr_status_t h2_proxy_fifo_remove(h2_proxy_fifo *fifo, void *elem);
+
+
#endif /* defined(__mod_h2__h2_proxy_util__) */
diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c
index f37741b61f..e23cb8d54b 100644
--- a/modules/http2/h2_session.c
+++ b/modules/http2/h2_session.c
@@ -72,7 +72,7 @@ static int h2_session_status_from_apr_status(apr_status_t rv)
return NGHTTP2_ERR_PROTO;
}
-static h2_stream *get_stream(h2_session *session, int stream_id)
+h2_stream *h2_session_stream_get(h2_session *session, int stream_id)
{
return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
}
@@ -231,7 +231,7 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
h2_stream * stream;
int rv = 0;
- stream = get_stream(session, stream_id);
+ stream = h2_session_stream_get(session, stream_id);
if (stream) {
status = h2_stream_recv_DATA(stream, flags, data, len);
}
@@ -256,7 +256,7 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
h2_stream *stream;
(void)ngh2;
- stream = get_stream(session, stream_id);
+ stream = h2_session_stream_get(session, stream_id);
if (stream) {
if (error_code) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
@@ -278,7 +278,7 @@ static int on_begin_headers_cb(nghttp2_session *ngh2,
/* We may see HEADERs at the start of a stream or after all DATA
* streams to carry trailers. */
(void)ngh2;
- s = get_stream(session, frame->hd.stream_id);
+ s = h2_session_stream_get(session, frame->hd.stream_id);
if (s) {
/* nop */
}
@@ -299,7 +299,7 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
apr_status_t status;
(void)flags;
- stream = get_stream(session, frame->hd.stream_id);
+ stream = h2_session_stream_get(session, frame->hd.stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(02920)
"h2_stream(%ld-%d): on_header unknown stream",
@@ -344,13 +344,13 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
/* This can be HEADERS for a new stream, defining the request,
* or HEADER may come after DATA at the end of a stream as in
* trailers */
- stream = get_stream(session, frame->hd.stream_id);
+ stream = h2_session_stream_get(session, frame->hd.stream_id);
if (stream) {
h2_stream_recv_frame(stream, NGHTTP2_HEADERS, frame->hd.flags);
}
break;
case NGHTTP2_DATA:
- stream = get_stream(session, frame->hd.stream_id);
+ stream = h2_session_stream_get(session, frame->hd.stream_id);
if (stream) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
H2_STRM_LOG(APLOGNO(02923), stream,
@@ -380,7 +380,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
"h2_stream(%ld-%d): RST_STREAM by client, errror=%d",
session->id, (int)frame->hd.stream_id,
(int)frame->rst_stream.error_code);
- stream = get_stream(session, frame->hd.stream_id);
+ stream = h2_session_stream_get(session, frame->hd.stream_id);
if (stream && stream->initiated_on) {
++session->pushes_reset;
}
@@ -453,7 +453,7 @@ static int on_send_data_cb(nghttp2_session *ngh2,
}
padlen = (unsigned char)frame->data.padlen;
- stream = get_stream(session, stream_id);
+ stream = h2_session_stream_get(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c,
APLOGNO(02924)
@@ -542,7 +542,7 @@ static int on_frame_send_cb(nghttp2_session *ngh2,
(long)session->frames_sent);
}
- stream = get_stream(session, stream_id);
+ stream = h2_session_stream_get(session, stream_id);
if (stream) {
h2_stream_send_frame(stream, frame->hd.type, frame->hd.flags);
}
@@ -566,7 +566,7 @@ static int on_invalid_header_cb(nghttp2_session *ngh2,
apr_pstrndup(session->pool, (const char *)name, namelen),
apr_pstrndup(session->pool, (const char *)value, valuelen));
}
- stream = get_stream(session, frame->hd.stream_id);
+ stream = h2_session_stream_get(session, frame->hd.stream_id);
if (stream) {
h2_stream_rst(stream, NGHTTP2_PROTOCOL_ERROR);
}
@@ -1028,7 +1028,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
(void)ng2s;
(void)buf;
(void)source;
- stream = get_stream(session, stream_id);
+ stream = h2_session_stream_get(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
APLOGNO(02937)
@@ -1449,7 +1449,7 @@ static void h2_session_in_flush(h2_session *session)
int id;
while ((id = h2_iq_shift(session->in_process)) > 0) {
- h2_stream *stream = get_stream(session, id);
+ h2_stream *stream = h2_session_stream_get(session, id);
if (stream) {
ap_assert(!stream->scheduled);
if (h2_stream_prep_processing(stream) == APR_SUCCESS) {
@@ -1462,7 +1462,7 @@ static void h2_session_in_flush(h2_session *session)
}
while ((id = h2_iq_shift(session->in_pending)) > 0) {
- h2_stream *stream = get_stream(session, id);
+ h2_stream *stream = h2_session_stream_get(session, id);
if (stream) {
h2_stream_flush_input(stream);
}
diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h
index 5751aed7bd..7a3ca3ca38 100644
--- a/modules/http2/h2_session.h
+++ b/modules/http2/h2_session.h
@@ -195,6 +195,11 @@ void h2_session_close(h2_session *session);
int h2_session_push_enabled(h2_session *session);
/**
+ * Look up the stream in this session with the given id.
+ */
+struct h2_stream *h2_session_stream_get(h2_session *session, int stream_id);
+
+/**
* Submit a push promise on the stream and schedule the new steam for
* processing..
*
diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c
index 7bf35aa3b2..9784b4ec28 100644
--- a/modules/http2/h2_stream.c
+++ b/modules/http2/h2_stream.c
@@ -764,18 +764,77 @@ static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb)
return NULL;
}
+static apr_status_t add_data(h2_stream *stream, apr_off_t requested,
+ apr_off_t *plen, int *peos, int *complete,
+ h2_headers **pheaders)
+{
+ apr_bucket *b, *e;
+
+ *peos = 0;
+ *plen = 0;
+ *complete = 0;
+ if (pheaders) {
+ *pheaders = NULL;
+ }
+
+ H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "add_data");
+ b = APR_BRIGADE_FIRST(stream->out_buffer);
+ while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
+ e = APR_BUCKET_NEXT(b);
+ if (APR_BUCKET_IS_METADATA(b)) {
+ if (APR_BUCKET_IS_FLUSH(b)) {
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ }
+ else if (APR_BUCKET_IS_EOS(b)) {
+ *peos = 1;
+ return APR_SUCCESS;
+ }
+ else if (H2_BUCKET_IS_HEADERS(b)) {
+ if (*plen > 0) {
+ /* data before the response, can only return up to here */
+ return APR_SUCCESS;
+ }
+ else if (pheaders) {
+ *pheaders = h2_bucket_headers_get(b);
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ H2_STRM_MSG(stream, "prep, -> response %d"),
+ (*pheaders)->status);
+ return APR_SUCCESS;
+ }
+ else {
+ return APR_EAGAIN;
+ }
+ }
+ }
+ else if (b->length == 0) {
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ }
+ else {
+ ap_assert(b->length != (apr_size_t)-1);
+ *plen += b->length;
+ if (*plen >= requested) {
+ *plen = requested;
+ return APR_SUCCESS;
+ }
+ }
+ b = e;
+ }
+ *complete = 1;
+ return APR_SUCCESS;
+}
+
apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
- int *peos, h2_headers **presponse)
+ int *peos, h2_headers **pheaders)
{
apr_status_t status = APR_SUCCESS;
- apr_off_t requested, max_chunk = H2_DATA_CHUNK_SIZE;
- apr_bucket *b, *e;
+ apr_off_t requested, missing, max_chunk = H2_DATA_CHUNK_SIZE;
conn_rec *c;
+ int complete;
- if (presponse) {
- *presponse = NULL;
- }
-
ap_assert(stream);
if (stream->rst_error) {
@@ -793,15 +852,34 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
if (stream->session->io.write_size > 0) {
max_chunk = stream->session->io.write_size - 9; /* header bits */
}
- *plen = requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;
+ requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;
+
+ /* count the buffered data until eos or a headers bucket */
+ status = add_data(stream, requested, plen, peos, &complete, pheaders);
+
+ if (status == APR_EAGAIN) {
+ /* TODO: ugly, someone needs to retrieve the response first */
+ h2_mplx_keep_active(stream->session->mplx, stream);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ H2_STRM_MSG(stream, "prep, response eagain"));
+ return status;
+ }
+ else if (status != APR_SUCCESS) {
+ return status;
+ }
- h2_util_bb_avail(stream->out_buffer, plen, peos);
- if (!*peos && *plen < requested && *plen < stream->max_mem) {
- H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
+ if (pheaders && *pheaders) {
+ return APR_SUCCESS;
+ }
+
+ missing = H2MIN(requested, stream->max_mem) - *plen;
+ if (complete && !*peos && missing > 0) {
if (stream->output) {
+ H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
status = h2_beam_receive(stream->output, stream->out_buffer,
APR_NONBLOCK_READ,
stream->max_mem - *plen);
+ H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post");
}
else {
status = APR_EOF;
@@ -810,79 +888,24 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
if (APR_STATUS_IS_EOF(status)) {
apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos);
+ *peos = 1;
status = APR_SUCCESS;
}
- else if (status == APR_EAGAIN) {
- status = APR_SUCCESS;
- }
- *plen = requested;
- h2_util_bb_avail(stream->out_buffer, plen, peos);
- H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post");
- }
- else {
- H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "ok");
- }
-
- b = APR_BRIGADE_FIRST(stream->out_buffer);
- while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
- e = APR_BUCKET_NEXT(b);
- if (APR_BUCKET_IS_FLUSH(b)
- || (!APR_BUCKET_IS_METADATA(b) && b->length == 0)) {
- APR_BUCKET_REMOVE(b);
- apr_bucket_destroy(b);
- }
- else {
- break;
+ else if (status == APR_SUCCESS) {
+ /* do it again, now that we have gotten more */
+ status = add_data(stream, requested, plen, peos, &complete, pheaders);
}
- b = e;
}
-
- b = get_first_headers_bucket(stream->out_buffer);
- if (b) {
- /* there are HEADERS to submit */
- *peos = 0;
- *plen = 0;
- if (b == APR_BRIGADE_FIRST(stream->out_buffer)) {
- if (presponse) {
- *presponse = h2_bucket_headers_get(b);
- APR_BUCKET_REMOVE(b);
- apr_bucket_destroy(b);
- status = APR_SUCCESS;
- }
- else {
- /* someone needs to retrieve the response first */
- h2_mplx_keep_active(stream->session->mplx, stream->id);
- status = APR_EAGAIN;
- }
- }
- else {
- apr_bucket *e = APR_BRIGADE_FIRST(stream->out_buffer);
- while (e != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
- if (e == b) {
- break;
- }
- else if (e->length != (apr_size_t)-1) {
- *plen += e->length;
- }
- e = APR_BUCKET_NEXT(e);
- }
- }
- }
-
+
if (status == APR_SUCCESS) {
- if (presponse && *presponse) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- H2_STRM_MSG(stream, "prepare, response %d"),
- (*presponse)->status);
- }
- else if (*peos || *plen) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+ if (*peos || *plen) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
H2_STRM_MSG(stream, "prepare, len=%ld eos=%d"),
(long)*plen, *peos);
}
else {
status = APR_EAGAIN;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
H2_STRM_MSG(stream, "prepare, no data"));
}
}
diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c
index 5ab485faab..1ef0d9a887 100644
--- a/modules/http2/h2_task.c
+++ b/modules/http2/h2_task.c
@@ -383,7 +383,7 @@ static apr_status_t h2_filter_parse_h1(ap_filter_t* f, apr_bucket_brigade* bb)
/* There are cases where we need to parse a serialized http/1.1
* response. One example is a 100-continue answer in serialized mode
* or via a mod_proxy setup */
- while (!task->output.sent_response) {
+ while (bb && !task->output.sent_response) {
status = h2_from_h1_parse_response(task, f, bb);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c,
"h2_task(%s): parsed response", task->id);
diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c
index 0389193e88..0ac65ccf65 100644
--- a/modules/http2/h2_util.c
+++ b/modules/http2/h2_util.c
@@ -438,12 +438,12 @@ int h2_iq_count(h2_iqueue *q)
}
-void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
+int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
{
int i;
if (h2_iq_contains(q, sid)) {
- return;
+ return 0;
}
if (q->nelts >= q->nalloc) {
iq_grow(q, q->nalloc * 2);
@@ -456,11 +456,12 @@ void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
/* bubble it to the front of the queue */
iq_bubble_up(q, i, q->head, cmp, ctx);
}
+ return 1;
}
-void h2_iq_append(h2_iqueue *q, int sid)
+int h2_iq_append(h2_iqueue *q, int sid)
{
- h2_iq_add(q, sid, NULL, NULL);
+ return h2_iq_add(q, sid, NULL, NULL);
}
int h2_iq_remove(h2_iqueue *q, int sid)
@@ -612,6 +613,7 @@ int h2_iq_contains(h2_iqueue *q, int sid)
struct h2_fifo {
void **elems;
int nelems;
+ int set;
int head;
int count;
int aborted;
@@ -636,7 +638,20 @@ static apr_status_t fifo_destroy(void *data)
return APR_SUCCESS;
}
-apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+static int index_of(h2_fifo *fifo, void *elem)
+{
+ int i;
+
+ for (i = 0; i < fifo->count; ++i) {
+ if (elem == fifo->elems[nth_index(fifo, i)]) {
+ return i;
+ }
+ }
+ return -1;
+}
+
+static apr_status_t create_int(h2_fifo **pfifo, apr_pool_t *pool,
+ int capacity, int as_set)
{
apr_status_t rv;
h2_fifo *fifo;
@@ -667,6 +682,7 @@ apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
return APR_ENOMEM;
}
fifo->nelems = capacity;
+ fifo->set = as_set;
*pfifo = fifo;
apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null);
@@ -674,6 +690,16 @@ apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
return APR_SUCCESS;
}
+apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+ return create_int(pfifo, pool, capacity, 0);
+}
+
+apr_status_t h2_fifo_set_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+ return create_int(pfifo, pool, capacity, 1);
+}
+
apr_status_t h2_fifo_term(h2_fifo *fifo)
{
apr_status_t rv;
@@ -725,7 +751,12 @@ static apr_status_t fifo_push(h2_fifo *fifo, void *elem, int block)
}
if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
- if (fifo->count == fifo->nelems) {
+ if (fifo->set && index_of(fifo, elem) >= 0) {
+ /* set mode, elem already member */
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EEXIST;
+ }
+ else if (fifo->count == fifo->nelems) {
if (block) {
while (fifo->count == fifo->nelems) {
if (fifo->aborted) {
@@ -762,6 +793,22 @@ apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem)
return fifo_push(fifo, elem, 0);
}
+static void *pull_head(h2_fifo *fifo)
+{
+ void *elem;
+
+ ap_assert(fifo->count > 0);
+ elem = fifo->elems[fifo->head];
+ --fifo->count;
+ if (fifo->count > 0) {
+ fifo->head = nth_index(fifo, 1);
+ if (fifo->count+1 == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ }
+ return elem;
+}
+
static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block)
{
apr_status_t rv;
@@ -778,14 +825,8 @@ static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block)
}
ap_assert(fifo->count > 0);
- *pelem = fifo->elems[fifo->head];
- --fifo->count;
- if (fifo->count > 0) {
- fifo->head = nth_index(fifo, 1);
- if (fifo->count+1 == fifo->nelems) {
- apr_thread_cond_broadcast(fifo->not_full);
- }
- }
+ *pelem = pull_head(fifo);
+
apr_thread_mutex_unlock(fifo->lock);
}
return rv;
@@ -817,29 +858,18 @@ static apr_status_t fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx, int
}
ap_assert(fifo->count > 0);
- elem = fifo->elems[fifo->head];
+ elem = pull_head(fifo);
+ apr_thread_mutex_unlock(fifo->lock);
+
switch (fn(elem, ctx)) {
case H2_FIFO_OP_PULL:
- --fifo->count;
- if (fifo->count > 0) {
- fifo->head = nth_index(fifo, 1);
- if (fifo->count+1 == fifo->nelems) {
- apr_thread_cond_broadcast(fifo->not_full);
- }
- }
break;
case H2_FIFO_OP_REPUSH:
- if (fifo->count > 1) {
- fifo->head = nth_index(fifo, 1);
- if (fifo->count < fifo->nelems) {
- fifo->elems[nth_index(fifo, fifo->count-1)] = elem;
- }
- }
+ return h2_fifo_push(fifo, elem);
break;
}
- apr_thread_mutex_unlock(fifo->lock);
}
return rv;
}
diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h
index f6a4b9a43d..9b408fad3d 100644
--- a/modules/http2/h2_util.h
+++ b/modules/http2/h2_util.h
@@ -119,17 +119,19 @@ int h2_iq_count(h2_iqueue *q);
* @param q the queue to append the id to
* @param sid the stream id to add
* @param cmp the comparator for sorting
- * @param ctx user data for comparator
+ * @param ctx user data for comparator
+ * @return != 0 iff id was not already there
*/
-void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx);
+int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx);
/**
* Append the id to the queue if not already present.
*
* @param q the queue to append the id to
* @param sid the id to append
+ * @return != 0 iff id was not already there
*/
-void h2_iq_append(h2_iqueue *q, int sid);
+int h2_iq_append(h2_iqueue *q, int sid);
/**
* Remove the stream id from the queue. Return != 0 iff task
@@ -193,12 +195,32 @@ int h2_iq_contains(h2_iqueue *q, int sid);
*/
typedef struct h2_fifo h2_fifo;
+/**
+ * Create a FIFO queue that can hold up to capacity elements. Elements can
+ * appear several times.
+ */
apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity);
+
+/**
+ * Create a FIFO set that can hold up to capacity elements. Elements only
+ * appear once. Pushing an element already present does not change the
+ * queue and is successful.
+ */
+apr_status_t h2_fifo_set_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity);
+
apr_status_t h2_fifo_term(h2_fifo *fifo);
apr_status_t h2_fifo_interrupt(h2_fifo *fifo);
int h2_fifo_count(h2_fifo *fifo);
+/**
+ * Push en element into the queue. Blocks if there is no capacity left.
+ *
+ * @param fifo the FIFO queue
+ * @param elem the element to push
+ * @return APR_SUCCESS on push, APR_EAGAIN on try_push on a full queue,
+ * APR_EEXIST when in set mode and elem already there.
+ */
apr_status_t h2_fifo_push(h2_fifo *fifo, void *elem);
apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem);
diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h
index e6765e5a03..528d21aed7 100644
--- a/modules/http2/h2_version.h
+++ b/modules/http2/h2_version.h
@@ -26,7 +26,7 @@
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.10.0"
+#define MOD_HTTP2_VERSION "1.10.1"
/**
* @macro
@@ -34,7 +34,7 @@
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010a00
+#define MOD_HTTP2_VERSION_NUM 0x010a01
#endif /* mod_h2_h2_version_h */
diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c
index fa395255e9..9c7afc64e6 100644
--- a/modules/http2/h2_workers.c
+++ b/modules/http2/h2_workers.c
@@ -39,6 +39,7 @@ struct h2_slot {
int sticks;
h2_task *task;
apr_thread_t *thread;
+ apr_thread_mutex_t *lock;
apr_thread_cond_t *not_idle;
};
@@ -78,6 +79,17 @@ static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
slot->workers = workers;
slot->aborted = 0;
slot->task = NULL;
+
+ if (!slot->lock) {
+ status = apr_thread_mutex_create(&slot->lock,
+ APR_THREAD_MUTEX_DEFAULT,
+ workers->pool);
+ if (status != APR_SUCCESS) {
+ push_slot(&workers->free, slot);
+ return status;
+ }
+ }
+
if (!slot->not_idle) {
status = apr_thread_cond_create(&slot->not_idle, workers->pool);
if (status != APR_SUCCESS) {
@@ -95,7 +107,7 @@ static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
return APR_ENOMEM;
}
- ++workers->worker_count;
+ apr_atomic_inc32(&workers->worker_count);
return APR_SUCCESS;
}
@@ -112,9 +124,9 @@ static void wake_idle_worker(h2_workers *workers)
{
h2_slot *slot = pop_slot(&workers->idle);
if (slot) {
- apr_thread_mutex_lock(workers->lock);
+ apr_thread_mutex_lock(slot->lock);
apr_thread_cond_signal(slot->not_idle);
- apr_thread_mutex_unlock(workers->lock);
+ apr_thread_mutex_unlock(slot->lock);
}
else if (workers->dynamic) {
add_worker(workers);
@@ -130,7 +142,7 @@ static void cleanup_zombies(h2_workers *workers)
apr_thread_join(&status, slot->thread);
slot->thread = NULL;
}
- --workers->worker_count;
+ apr_atomic_dec32(&workers->worker_count);
push_slot(&workers->free, slot);
}
}
@@ -185,15 +197,12 @@ static apr_status_t get_next(h2_slot *slot)
return APR_SUCCESS;
}
- apr_thread_mutex_lock(workers->lock);
cleanup_zombies(workers);
- ++workers->idle_workers;
+ apr_thread_mutex_lock(slot->lock);
push_slot(&workers->idle, slot);
- apr_thread_cond_wait(slot->not_idle, workers->lock);
- --workers->idle_workers;
-
- apr_thread_mutex_unlock(workers->lock);
+ apr_thread_cond_wait(slot->not_idle, slot->lock);
+ apr_thread_mutex_unlock(slot->lock);
}
return APR_EOF;
}
@@ -239,24 +248,25 @@ static apr_status_t workers_pool_cleanup(void *data)
h2_slot *slot;
if (!workers->aborted) {
- apr_thread_mutex_lock(workers->lock);
workers->aborted = 1;
- /* before we go, cleanup any zombies and abort the rest */
- cleanup_zombies(workers);
+ /* abort all idle slots */
for (;;) {
slot = pop_slot(&workers->idle);
if (slot) {
+ apr_thread_mutex_lock(slot->lock);
slot->aborted = 1;
apr_thread_cond_signal(slot->not_idle);
+ apr_thread_mutex_unlock(slot->lock);
}
else {
break;
}
}
- apr_thread_mutex_unlock(workers->lock);
h2_fifo_term(workers->mplxs);
h2_fifo_interrupt(workers->mplxs);
+
+ cleanup_zombies(workers);
}
return APR_SUCCESS;
}
diff --git a/modules/http2/h2_workers.h b/modules/http2/h2_workers.h
index 30a7514cd0..7964b3c3aa 100644
--- a/modules/http2/h2_workers.h
+++ b/modules/http2/h2_workers.h
@@ -40,8 +40,6 @@ struct h2_workers {
int next_worker_id;
int min_workers;
int max_workers;
- int worker_count;
- int idle_workers;
int max_idle_secs;
int aborted;
@@ -51,6 +49,8 @@ struct h2_workers {
int nslots;
struct h2_slot *slots;
+ volatile apr_uint32_t worker_count;
+
struct h2_slot *free;
struct h2_slot *idle;
struct h2_slot *zombies;
diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c
index ef23d0c428..5b2a798996 100644
--- a/modules/http2/mod_proxy_http2.c
+++ b/modules/http2/mod_proxy_http2.c
@@ -26,6 +26,8 @@
#include "h2_version.h"
#include "h2_proxy_session.h"
+#define H2MIN(x,y) ((x) < (y) ? (x) : (y))
+
static void register_hook(apr_pool_t *p);
AP_DECLARE_MODULE(proxy_http2) = {
@@ -65,7 +67,7 @@ typedef struct h2_proxy_ctx {
const char *engine_type;
apr_pool_t *engine_pool;
apr_size_t req_buffer_size;
- request_rec *next;
+ h2_proxy_fifo *requests;
int capacity;
unsigned standalone : 1;
@@ -218,36 +220,23 @@ static apr_status_t proxy_engine_init(h2_req_engine *engine,
{
h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config,
&proxy_http2_module);
- if (ctx) {
- conn_rec *c = ctx->owner;
- h2_proxy_ctx *nctx;
-
- /* we need another lifetime for this. If we do not host
- * an engine, the context lives in r->pool. Since we expect
- * to server more than r, we need to live longer */
- nctx = apr_pcalloc(pool, sizeof(*nctx));
- if (nctx == NULL) {
- return APR_ENOMEM;
- }
- memcpy(nctx, ctx, sizeof(*nctx));
- ctx = nctx;
- ctx->pool = pool;
- ctx->engine = engine;
- ctx->engine_id = id;
- ctx->engine_type = type;
- ctx->engine_pool = pool;
- ctx->req_buffer_size = req_buffer_size;
- ctx->capacity = 100;
-
- ap_set_module_config(c->conn_config, &proxy_http2_module, ctx);
-
- *pconsumed = out_consumed;
- *pctx = ctx;
- return APR_SUCCESS;
+ if (!ctx) {
+ ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, APLOGNO(03368)
+ "h2_proxy_session, engine init, no ctx found");
+ return APR_ENOTIMPL;
}
- ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, APLOGNO(03368)
- "h2_proxy_session, engine init, no ctx found");
- return APR_ENOTIMPL;
+
+ ctx->pool = pool;
+ ctx->engine = engine;
+ ctx->engine_id = id;
+ ctx->engine_type = type;
+ ctx->engine_pool = pool;
+ ctx->req_buffer_size = req_buffer_size;
+ ctx->capacity = H2MIN(100, h2_proxy_fifo_capacity(ctx->requests));
+
+ *pconsumed = out_consumed;
+ *pctx = ctx;
+ return APR_SUCCESS;
}
static apr_status_t add_request(h2_proxy_session *session, request_rec *r)
@@ -270,10 +259,9 @@ static apr_status_t add_request(h2_proxy_session *session, request_rec *r)
return status;
}
-static void request_done(h2_proxy_session *session, request_rec *r,
+static void request_done(h2_proxy_ctx *ctx, request_rec *r,
apr_status_t status, int touched)
{
- h2_proxy_ctx *ctx = session->user_data;
const char *task_id = apr_table_get(r->connection->notes, H2_TASK_ID_NOTE);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection,
@@ -282,35 +270,26 @@ static void request_done(h2_proxy_session *session, request_rec *r,
if (status != APR_SUCCESS) {
if (!touched) {
/* untouched request, need rescheduling */
- if (req_engine_push && is_h2 && is_h2(ctx->owner)) {
- if (req_engine_push(ctx->engine_type, r, NULL) == APR_SUCCESS) {
- /* push to engine */
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
- APLOGNO(03369)
- "h2_proxy_session(%s): rescheduled request %s",
- ctx->engine_id, task_id);
- return;
- }
- }
- else if (!ctx->next) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection,
- "h2_proxy_session(%s): retry untouched request",
- ctx->engine_id);
- ctx->next = r;
- }
+ status = h2_proxy_fifo_push(ctx->requests, r);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection,
+ APLOGNO(03369)
+ "h2_proxy_session(%s): rescheduled request %s",
+ ctx->engine_id, task_id);
+ return;
}
else {
const char *uri;
uri = apr_uri_unparse(r->pool, &r->parsed_uri, 0);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection,
APLOGNO(03471) "h2_proxy_session(%s): request %s -> %s "
- "not complete, was touched",
+ "not complete, cannot repeat",
ctx->engine_id, task_id, uri);
}
}
if (r == ctx->rbase) {
- ctx->r_status = (status == APR_SUCCESS)? APR_SUCCESS : HTTP_SERVICE_UNAVAILABLE;
+ ctx->r_status = ((status == APR_SUCCESS)? APR_SUCCESS
+ : HTTP_SERVICE_UNAVAILABLE);
}
if (req_engine_done && ctx->engine) {
@@ -322,21 +301,32 @@ static void request_done(h2_proxy_session *session, request_rec *r,
}
}
+static void session_req_done(h2_proxy_session *session, request_rec *r,
+ apr_status_t status, int touched)
+{
+ request_done(session->user_data, r, status, touched);
+}
+
static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave)
{
- if (ctx->next) {
+ if (h2_proxy_fifo_count(ctx->requests) > 0) {
return APR_SUCCESS;
}
else if (req_engine_pull && ctx->engine) {
apr_status_t status;
+ request_rec *r = NULL;
+
status = req_engine_pull(ctx->engine, before_leave?
APR_BLOCK_READ: APR_NONBLOCK_READ,
- ctx->capacity, &ctx->next);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, ctx->owner,
- "h2_proxy_engine(%s): pulled request (%s) %s",
- ctx->engine_id,
- before_leave? "before leave" : "regular",
- (ctx->next? ctx->next->the_request : "NULL"));
+ ctx->capacity, &r);
+ if (status == APR_SUCCESS && r) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, ctx->owner,
+ "h2_proxy_engine(%s): pulled request (%s) %s",
+ ctx->engine_id,
+ before_leave? "before leave" : "regular",
+ r->the_request);
+ h2_proxy_fifo_push(ctx->requests, r);
+ }
return APR_STATUS_IS_EAGAIN(status)? APR_SUCCESS : status;
}
return APR_EOF;
@@ -345,6 +335,7 @@ static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave)
static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
apr_status_t status = OK;
int h2_front;
+ request_rec *r;
/* Step Four: Send the Request in a new HTTP/2 stream and
* loop until we got the response or encounter errors.
@@ -355,7 +346,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
ctx->session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf,
h2_front, 30,
h2_proxy_log2((int)ctx->req_buffer_size),
- request_done);
+ session_req_done);
if (!ctx->session) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner,
APLOGNO(03372) "session unavailable");
@@ -366,10 +357,9 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
"eng(%s): run session %s", ctx->engine_id, ctx->session->id);
ctx->session->user_data = ctx;
- while (1) {
- if (ctx->next) {
- add_request(ctx->session, ctx->next);
- ctx->next = NULL;
+ while (!ctx->owner->aborted) {
+ if (APR_SUCCESS == h2_proxy_fifo_try_pull(ctx->requests, (void**)&r)) {
+ add_request(ctx->session, r);
}
status = h2_proxy_session_process(ctx->session);
@@ -379,7 +369,8 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
/* ongoing processing, call again */
if (ctx->session->remote_max_concurrent > 0
&& ctx->session->remote_max_concurrent != ctx->capacity) {
- ctx->capacity = (int)ctx->session->remote_max_concurrent;
+ ctx->capacity = H2MIN((int)ctx->session->remote_max_concurrent,
+ h2_proxy_fifo_capacity(ctx->requests));
}
s2 = next_request(ctx, 0);
if (s2 == APR_ECONNABORTED) {
@@ -395,7 +386,8 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
status = ctx->r_status = APR_SUCCESS;
break;
}
- if (!ctx->next && h2_proxy_ihash_empty(ctx->session->streams)) {
+ if ((h2_proxy_fifo_count(ctx->requests) == 0)
+ && h2_proxy_ihash_empty(ctx->session->streams)) {
break;
}
}
@@ -409,7 +401,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
* a) be reopened on the new session iff safe to do so
* b) reported as done (failed) otherwise
*/
- h2_proxy_session_cleanup(ctx->session, request_done);
+ h2_proxy_session_cleanup(ctx->session, session_req_done);
break;
}
}
@@ -420,7 +412,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
return status;
}
-static h2_proxy_ctx *push_request_somewhere(h2_proxy_ctx *ctx)
+static apr_status_t push_request_somewhere(h2_proxy_ctx *ctx, request_rec *r)
{
conn_rec *c = ctx->owner;
const char *engine_type, *hostname;
@@ -430,21 +422,15 @@ static h2_proxy_ctx *push_request_somewhere(h2_proxy_ctx *ctx)
engine_type = apr_psprintf(ctx->pool, "proxy_http2 %s%s", hostname,
ctx->server_portstr);
- if (c->master && req_engine_push && ctx->next && is_h2 && is_h2(c)) {
+ if (c->master && req_engine_push && r && is_h2 && is_h2(c)) {
/* If we are have req_engine capabilities, push the handling of this
* request (e.g. slave connection) to a proxy_http2 engine which
* uses the same backend. We may be called to create an engine
* ourself. */
- if (req_engine_push(engine_type, ctx->next, proxy_engine_init)
- == APR_SUCCESS) {
- /* to renew the lifetime, we might have set a new ctx */
- ctx = ap_get_module_config(c->conn_config, &proxy_http2_module);
+ if (req_engine_push(engine_type, r, proxy_engine_init) == APR_SUCCESS) {
if (ctx->engine == NULL) {
- /* Another engine instance has taken over processing of this
- * request. */
- ctx->r_status = SUSPENDED;
- ctx->next = NULL;
- return ctx;
+ /* request has been assigned to an engine in another thread */
+ return SUSPENDED;
}
}
}
@@ -465,7 +451,8 @@ static h2_proxy_ctx *push_request_somewhere(h2_proxy_ctx *ctx)
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"H2: hosting engine %s", ctx->engine_id);
}
- return ctx;
+
+ return h2_proxy_fifo_push(ctx->requests, r);
}
static int proxy_http2_handler(request_rec *r,
@@ -482,7 +469,7 @@ static int proxy_http2_handler(request_rec *r,
apr_status_t status;
h2_proxy_ctx *ctx;
apr_uri_t uri;
- int reconnected = 0;
+ int reconnects = 0;
/* find the scheme */
if ((url[0] != 'h' && url[0] != 'H') || url[1] != '2') {
@@ -507,6 +494,7 @@ static int proxy_http2_handler(request_rec *r,
default:
return DECLINED;
}
+
ctx = apr_pcalloc(r->pool, sizeof(*ctx));
ctx->owner = r->connection;
ctx->pool = r->pool;
@@ -518,8 +506,9 @@ static int proxy_http2_handler(request_rec *r,
ctx->conf = conf;
ctx->flushall = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0;
ctx->r_status = HTTP_SERVICE_UNAVAILABLE;
- ctx->next = r;
- r = NULL;
+
+ h2_proxy_fifo_set_create(&ctx->requests, ctx->pool, 100);
+
ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, ctx);
/* scheme says, this is for us. */
@@ -565,10 +554,11 @@ run_connect:
/* If we are not already hosting an engine, try to push the request
* to an already existing engine or host a new engine here. */
- if (!ctx->engine) {
- ctx = push_request_somewhere(ctx);
+ if (r && !ctx->engine) {
+ ctx->r_status = push_request_somewhere(ctx, r);
+ r = NULL;
if (ctx->r_status == SUSPENDED) {
- /* request was pushed to another engine */
+ /* request was pushed to another thread, leave processing here */
goto cleanup;
}
}
@@ -581,7 +571,7 @@ run_connect:
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(03352)
"H2: failed to make connection to backend: %s",
ctx->p_conn->hostname);
- goto cleanup;
+ goto reconnect;
}
/* Step Three: Create conn_rec for the socket we have open now. */
@@ -593,7 +583,7 @@ run_connect:
"setup new connection: is_ssl=%d %s %s %s",
ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname,
locurl, ctx->p_conn->hostname);
- goto cleanup;
+ goto reconnect;
}
if (!ctx->p_conn->data) {
@@ -628,8 +618,8 @@ run_session:
ctx->engine = NULL;
}
-cleanup:
- if (!reconnected && next_request(ctx, 1) == APR_SUCCESS) {
+reconnect:
+ if (next_request(ctx, 1) == APR_SUCCESS) {
/* Still more to do, tear down old conn and start over */
if (ctx->p_conn) {
ctx->p_conn->close = 1;
@@ -638,10 +628,16 @@ cleanup:
ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server);
ctx->p_conn = NULL;
}
- reconnected = 1; /* we do this only once, then fail */
- goto run_connect;
+ ++reconnects;
+ if (reconnects < 5 && !ctx->owner->aborted) {
+ goto run_connect;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(10023)
+ "giving up after %d reconnects, %d requests todo",
+ reconnects, h2_proxy_fifo_count(ctx->requests));
}
+cleanup:
if (ctx->p_conn) {
if (status != APR_SUCCESS) {
/* close socket when errors happened or session shut down (EOF) */
@@ -653,6 +649,11 @@ cleanup:
ctx->p_conn = NULL;
}
+ /* Any requests will still have need to fail */
+ while (APR_SUCCESS == h2_proxy_fifo_try_pull(ctx->requests, (void**)&r)) {
+ request_done(ctx, r, HTTP_SERVICE_UNAVAILABLE, 1);
+ }
+
ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, NULL);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner,
APLOGNO(03377) "leaving handler");