summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Eissing <icing@apache.org>2017-03-30 16:05:06 +0000
committerStefan Eissing <icing@apache.org>2017-03-30 16:05:06 +0000
commit5440a7562b4a53658a17592b5b1ec16f88df5484 (patch)
tree5a75beae121178dfc656c2c53f677b25ac8edea9
parentf56036523cb52363e4ddc5eaefdc339beb99898e (diff)
downloadhttpd-5440a7562b4a53658a17592b5b1ec16f88df5484.tar.gz
On the trunk:
mod_http2: move stuff from master connection to worker threads, increase spare slave connections, create output beams in worker when needed. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1789535 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--CHANGES4
-rw-r--r--modules/http2/h2_bucket_beam.c3
-rw-r--r--modules/http2/h2_bucket_beam.h11
-rw-r--r--modules/http2/h2_conn.c2
-rw-r--r--modules/http2/h2_mplx.c118
-rw-r--r--modules/http2/h2_mplx.h6
-rw-r--r--modules/http2/h2_session.c4
-rw-r--r--modules/http2/h2_stream.c24
-rw-r--r--modules/http2/h2_task.c83
-rw-r--r--modules/http2/h2_task.h7
10 files changed, 156 insertions, 106 deletions
diff --git a/CHANGES b/CHANGES
index e219f7ad10..8150e09f38 100644
--- a/CHANGES
+++ b/CHANGES
@@ -2,7 +2,9 @@
Changes with Apache 2.5.0
*) mod_http2: better performance, eliminated need for nested locks and
- thread privates. [Stefan Eissing]
+ thread privates. Moving request setups from the main connection to the
+ worker threads. Increase number of spare connections kept.
+ [Stefan Eissing]
*) core: Disallow multiple Listen on the same IP:port when listener buckets
are configured (ListenCoresBucketsRatio > 0), consistently with the single
diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c
index 7fb6cb6f79..17ad3d95f1 100644
--- a/modules/http2/h2_bucket_beam.c
+++ b/modules/http2/h2_bucket_beam.c
@@ -832,11 +832,10 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
apr_bucket_file *bf = b->data;
apr_file_t *fd = bf->fd;
int can_beam = (bf->refcount.refcount == 1);
- if (can_beam && beam->last_beamed != fd && beam->can_beam_fn) {
+ if (can_beam && beam->can_beam_fn) {
can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
}
if (can_beam) {
- beam->last_beamed = fd;
status = apr_bucket_setaside(b, beam->send_pool);
}
/* else: enter ENOTIMPL case below */
diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h
index 18bc32629f..64117ff159 100644
--- a/modules/http2/h2_bucket_beam.h
+++ b/modules/http2/h2_bucket_beam.h
@@ -183,7 +183,6 @@ struct h2_bucket_beam {
apr_size_t buckets_sent; /* # of beam buckets sent */
apr_size_t files_beamed; /* how many file handles have been set aside */
- apr_file_t *last_beamed; /* last file beamed */
unsigned int aborted : 1;
unsigned int closed : 1;
@@ -376,6 +375,16 @@ int h2_beam_report_consumption(h2_bucket_beam *beam);
void h2_beam_on_produced(h2_bucket_beam *beam,
h2_beam_io_callback *io_cb, void *ctx);
+/**
+ * Register a callback that may prevent a file from being beam as
+ * file handle, forcing the file content to be copied. Then no callback
+ * is set (NULL), file handles are transferred directly.
+ * @param beam the beam to set the callback on
+ * @param io_cb the callback or NULL, called on receiver with bytes produced
+ * @param ctx the context to use in callback invocation
+ *
+ * Call from the receiver side, callbacks invoked on either side.
+ */
void h2_beam_on_file_beam(h2_bucket_beam *beam,
h2_beam_can_beam_callback *cb, void *ctx);
diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c
index 73c838e0e8..fcf6bad4d4 100644
--- a/modules/http2/h2_conn.c
+++ b/modules/http2/h2_conn.c
@@ -308,6 +308,7 @@ conn_rec *h2_slave_create(conn_rec *master, int slave_id, apr_pool_t *parent)
master->id, slave_id);
/* Simulate that we had already a request on this connection. */
c->keepalives = 1;
+ c->aborted = 0;
/* We cannot install the master connection socket on the slaves, as
* modules mess with timeouts/blocking of the socket, with
* unwanted side effects to the master connection processing.
@@ -335,6 +336,7 @@ void h2_slave_destroy(conn_rec *slave)
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave,
"h2_stream(%s): destroy slave",
apr_table_get(slave->notes, H2_TASK_ID_NOTE));
+ slave->sbh = NULL;
apr_pool_destroy(slave->pool);
}
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c
index 7eb101db49..78ae81365f 100644
--- a/modules/http2/h2_mplx.c
+++ b/modules/http2/h2_mplx.c
@@ -131,11 +131,6 @@ static void stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t len
h2_stream_in_consumed(ctx, length);
}
-static int can_always_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
-{
- return 1;
-}
-
static void stream_joined(h2_mplx *m, h2_stream *stream)
{
ap_assert(!stream->task || stream->task->worker_done);
@@ -152,8 +147,10 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream)
h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
h2_beam_abort(stream->input);
}
- h2_beam_on_produced(stream->output, NULL, NULL);
- h2_beam_leave(stream->output);
+ if (stream->output) {
+ h2_beam_on_produced(stream->output, NULL, NULL);
+ h2_beam_leave(stream->output);
+ }
h2_stream_cleanup(stream);
@@ -246,10 +243,10 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
m->readyq = h2_iq_create(m->pool, m->max_streams);
m->workers = workers;
- m->workers_max = workers->max_workers;
- m->workers_limit = 6; /* the original h1 max parallel connections */
+ m->max_active = workers->max_workers;
+ m->limit_active = 6; /* the original h1 max parallel connections */
m->last_limit_change = m->last_idle_block = apr_time_now();
- m->limit_change_interval = apr_time_from_msec(200);
+ m->limit_change_interval = apr_time_from_msec(100);
m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*));
@@ -301,7 +298,7 @@ static void task_destroy(h2_mplx *m, h2_task *task)
int reuse_slave = 0;
slave = task->c;
- reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
+ reuse_slave = ((m->spare_slaves->nelts < (m->limit_active * 3 / 2))
&& !task->rst_error);
if (slave) {
@@ -328,12 +325,6 @@ static int stream_destroy_iter(void *ctx, void *val)
h2_ihash_remove(m->spurge, stream->id);
ap_assert(stream->state == H2_SS_CLEANUP);
- if (stream->output == NULL) {
- ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, m->c,
- H2_STRM_MSG(stream, "already with beams==NULL"));
- return 0;
- }
-
if (stream->input) {
/* Process outstanding events before destruction */
input_consumed_signal(m, stream);
@@ -341,10 +332,7 @@ static int stream_destroy_iter(void *ctx, void *val)
h2_beam_destroy(stream->input);
stream->input = NULL;
}
-
- h2_beam_log(stream->output, m->c, APLOG_TRACE2, "stream_destroy");
- h2_beam_destroy(stream->output);
- stream->output = NULL;
+
if (stream->task) {
task_destroy(m, stream->task);
stream->task = NULL;
@@ -547,10 +535,13 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
apr_status_t status = APR_SUCCESS;
h2_stream *stream = h2_ihash_get(m->streams, stream_id);
- if (!stream || !stream->task) {
+ if (!stream || !stream->task || m->aborted) {
return APR_ECONNABORTED;
}
+ ap_assert(stream->output == NULL);
+ stream->output = beam;
+
if (APLOGctrace2(m->c)) {
h2_beam_log(beam, m->c, APLOG_TRACE2, "out_open");
}
@@ -561,8 +552,8 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream);
h2_beam_on_produced(stream->output, output_produced, m);
- if (!stream->task->output.copy_files) {
- h2_beam_on_file_beam(stream->output, can_always_beam_file, m);
+ if (stream->task->output.copy_files) {
+ h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL);
}
/* time to protect the beam against multi-threaded use */
@@ -721,7 +712,7 @@ static h2_task *next_stream_task(h2_mplx *m)
{
h2_stream *stream;
int sid;
- while (!m->aborted && (m->workers_busy < m->workers_limit)
+ while (!m->aborted && (m->tasks_active < m->limit_active)
&& (sid = h2_iq_shift(m->q)) > 0) {
stream = h2_ihash_get(m->streams, sid);
@@ -731,36 +722,37 @@ static h2_task *next_stream_task(h2_mplx *m)
pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
if (pslave) {
slave = *pslave;
+ slave->aborted = 0;
}
else {
slave = h2_slave_create(m->c, stream->id, m->pool);
}
- slave->sbh = m->c->sbh;
- slave->aborted = 0;
if (!stream->task) {
- stream->task = h2_task_create(stream, slave);
-
+
m->c->keepalives++;
- apr_table_setn(slave->notes, H2_TASK_ID_NOTE, stream->task->id);
- h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
-
if (sid > m->max_stream_started) {
m->max_stream_started = sid;
}
-
if (stream->input) {
h2_beam_on_consumed(stream->input, stream_input_ev,
stream_input_consumed, stream);
- h2_beam_on_file_beam(stream->input, can_always_beam_file, m);
- h2_beam_mutex_enable(stream->input);
}
- h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
+ stream->task = h2_task_create(slave, stream->id,
+ stream->request, m, stream->input,
+ stream->session->s->timeout,
+ m->stream_max_mem);
+ if (!stream->task) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, slave,
+ H2_STRM_LOG(APLOGNO(02941), stream,
+ "create task"));
+ return NULL;
+ }
+
}
- stream->task->worker_started = 1;
- stream->task->started_at = apr_time_now();
- ++m->workers_busy;
+
+ ++m->tasks_active;
return stream->task;
}
}
@@ -841,14 +833,14 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
* a block by flow control.
*/
if (task->done_at- m->last_limit_change >= m->limit_change_interval
- && m->workers_limit < m->workers_max) {
+ && m->limit_active < m->max_active) {
/* Well behaving stream, allow it more workers */
- m->workers_limit = H2MIN(m->workers_limit * 2,
- m->workers_max);
+ m->limit_active = H2MIN(m->limit_active * 2,
+ m->max_active);
m->last_limit_change = task->done_at;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): increase worker limit to %d",
- m->id, m->workers_limit);
+ m->id, m->limit_active);
}
}
@@ -870,7 +862,9 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
h2_beam_mutex_disable(stream->input);
h2_beam_leave(stream->input);
}
- h2_beam_mutex_disable(stream->output);
+ if (stream->output) {
+ h2_beam_mutex_disable(stream->output);
+ }
check_data_for(m, stream->id);
}
else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
@@ -881,7 +875,9 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
h2_beam_mutex_disable(stream->input);
h2_beam_leave(stream->input);
}
- h2_beam_mutex_disable(stream->output);
+ if (stream->output) {
+ h2_beam_mutex_disable(stream->output);
+ }
stream_joined(m, stream);
}
else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) {
@@ -903,7 +899,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
task_done(m, task, NULL);
- --m->workers_busy;
+ --m->tasks_active;
if (m->join_wait) {
apr_thread_cond_signal(m->join_wait);
}
@@ -982,14 +978,14 @@ static apr_status_t unschedule_slow_tasks(h2_mplx *m)
/* Try to get rid of streams that occupy workers. Look for safe requests
* that are repeatable. If none found, fail the connection.
*/
- n = (m->workers_busy - m->workers_limit - (int)h2_ihash_count(m->sredo));
+ n = (m->tasks_active - m->limit_active - (int)h2_ihash_count(m->sredo));
while (n > 0 && (stream = get_latest_repeatable_unsubmitted_stream(m))) {
h2_task_rst(stream->task, H2_ERR_CANCEL);
h2_ihash_add(m->sredo, stream);
--n;
}
- if ((m->workers_busy - h2_ihash_count(m->sredo)) > m->workers_limit) {
+ if ((m->tasks_active - h2_ihash_count(m->sredo)) > m->limit_active) {
h2_stream *stream = get_timed_out_busy_stream(m);
if (stream) {
/* Too many busy workers, unable to cancel enough streams
@@ -1009,7 +1005,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
apr_size_t scount = h2_ihash_count(m->streams);
- if (scount > 0 && m->workers_busy) {
+ if (scount > 0 && m->tasks_active) {
/* If we have streams in connection state 'IDLE', meaning
* all streams are ready to sent data out, but lack
* WINDOW_UPDATEs.
@@ -1024,27 +1020,27 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
*/
now = apr_time_now();
m->last_idle_block = now;
- if (m->workers_limit > 2
+ if (m->limit_active > 2
&& now - m->last_limit_change >= m->limit_change_interval) {
- if (m->workers_limit > 16) {
- m->workers_limit = 16;
+ if (m->limit_active > 16) {
+ m->limit_active = 16;
}
- else if (m->workers_limit > 8) {
- m->workers_limit = 8;
+ else if (m->limit_active > 8) {
+ m->limit_active = 8;
}
- else if (m->workers_limit > 4) {
- m->workers_limit = 4;
+ else if (m->limit_active > 4) {
+ m->limit_active = 4;
}
- else if (m->workers_limit > 2) {
- m->workers_limit = 2;
+ else if (m->limit_active > 2) {
+ m->limit_active = 2;
}
m->last_limit_change = now;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): decrease worker limit to %d",
- m->id, m->workers_limit);
+ m->id, m->limit_active);
}
- if (m->workers_busy > m->workers_limit) {
+ if (m->tasks_active > m->limit_active) {
status = unschedule_slow_tasks(m);
}
}
@@ -1257,7 +1253,7 @@ int h2_mplx_awaits_data(h2_mplx *m)
if (h2_ihash_empty(m->streams)) {
waiting = 0;
}
- if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->workers_busy) {
+ if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) {
waiting = 0;
}
leave_mutex(m, acquired);
diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h
index 23162e8eb8..82a98fce0a 100644
--- a/modules/http2/h2_mplx.h
+++ b/modules/http2/h2_mplx.h
@@ -74,9 +74,9 @@ struct h2_mplx {
int max_streams; /* max # of concurrent streams */
int max_stream_started; /* highest stream id that started processing */
- int workers_busy; /* # of workers processing on this mplx */
- int workers_limit; /* current # of workers limit, dynamic */
- int workers_max; /* max, hard limit # of workers in a process */
+ int tasks_active; /* # of tasks being processed from this mplx */
+ int limit_active; /* current limit on active tasks, dynamic */
+ int max_active; /* max, hard limit # of active tasks in a process */
apr_time_t last_idle_block; /* last time, this mplx entered IDLE while
* streams were ready */
apr_time_t last_limit_change; /* last time, worker limit changed */
diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c
index c7126beb95..f37741b61f 100644
--- a/modules/http2/h2_session.c
+++ b/modules/http2/h2_session.c
@@ -870,8 +870,8 @@ static apr_status_t h2_session_create_int(h2_session **psession,
"push_diary(type=%d,N=%d)"),
(int)session->max_stream_count,
(int)session->max_stream_mem,
- session->mplx->workers_limit,
- session->mplx->workers_max,
+ session->mplx->limit_active,
+ session->mplx->max_active,
session->push_diary->dtype,
(int)session->push_diary->N);
}
diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c
index 0b66af84e7..7bf35aa3b2 100644
--- a/modules/http2/h2_stream.c
+++ b/modules/http2/h2_stream.c
@@ -241,7 +241,7 @@ static apr_status_t close_input(h2_stream *stream)
static apr_status_t close_output(h2_stream *stream)
{
- if (h2_beam_is_closed(stream->output)) {
+ if (!stream->output || h2_beam_is_closed(stream->output)) {
return APR_SUCCESS;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
@@ -531,9 +531,6 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
stream->monitor = monitor;
stream->max_mem = session->max_stream_mem;
- h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0,
- session->s->timeout);
-
#ifdef H2_NG2_LOCAL_WIN_SIZE
stream->in_window_size =
nghttp2_session_get_stream_local_window_size(
@@ -607,7 +604,9 @@ void h2_stream_rst(h2_stream *stream, int error_code)
if (stream->input) {
h2_beam_abort(stream->input);
}
- h2_beam_leave(stream->output);
+ if (stream->output) {
+ h2_beam_leave(stream->output);
+ }
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
H2_STRM_MSG(stream, "reset, error=%d"), error_code);
h2_stream_dispatch(stream, H2_SEV_CANCELLED);
@@ -777,6 +776,8 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
*presponse = NULL;
}
+ ap_assert(stream);
+
if (stream->rst_error) {
*plen = 0;
*peos = 1;
@@ -785,7 +786,7 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
c = stream->session->c;
prep_output(stream);
-
+
/* determine how much we'd like to send. We cannot send more than
* is requested. But we can reduce the size in case the master
* connection operates in smaller chunks. (TSL warmup) */
@@ -797,8 +798,15 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
h2_util_bb_avail(stream->out_buffer, plen, peos);
if (!*peos && *plen < requested && *plen < stream->max_mem) {
H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
- status = h2_beam_receive(stream->output, stream->out_buffer,
- APR_NONBLOCK_READ, stream->max_mem - *plen);
+ if (stream->output) {
+ status = h2_beam_receive(stream->output, stream->out_buffer,
+ APR_NONBLOCK_READ,
+ stream->max_mem - *plen);
+ }
+ else {
+ status = APR_EOF;
+ }
+
if (APR_STATUS_IS_EOF(status)) {
apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos);
diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c
index a93fce4cd2..5ab485faab 100644
--- a/modules/http2/h2_task.c
+++ b/modules/http2/h2_task.c
@@ -496,42 +496,44 @@ static int h2_task_pre_conn(conn_rec* c, void *arg)
return OK;
}
-h2_task *h2_task_create(h2_stream *stream, conn_rec *slave)
+h2_task *h2_task_create(conn_rec *slave, int stream_id,
+ const h2_request *req, h2_mplx *m,
+ h2_bucket_beam *input,
+ apr_interval_time_t timeout,
+ apr_size_t output_max_mem)
{
apr_pool_t *pool;
h2_task *task;
ap_assert(slave);
- ap_assert(stream);
- ap_assert(stream->request);
+ ap_assert(req);
apr_pool_create(&pool, slave->pool);
task = apr_pcalloc(pool, sizeof(h2_task));
if (task == NULL) {
- ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, slave,
- H2_STRM_LOG(APLOGNO(02941), stream, "create task"));
return NULL;
}
- task->id = apr_psprintf(pool, "%ld-%d",
- stream->session->id, stream->id);
- task->stream_id = stream->id;
+ task->id = "000";
+ task->stream_id = stream_id;
task->c = slave;
- task->mplx = stream->session->mplx;
- task->c->keepalives = slave->master->keepalives;
+ task->mplx = m;
task->pool = pool;
- task->request = stream->request;
- task->input.beam = stream->input;
- task->output.beam = stream->output;
- task->timeout = stream->session->s->timeout;
-
- h2_beam_send_from(stream->output, task->pool);
- h2_ctx_create_for(slave, task);
-
+ task->request = req;
+ task->timeout = timeout;
+ task->input.beam = input;
+ task->output.max_buffer = output_max_mem;
+
return task;
}
void h2_task_destroy(h2_task *task)
{
+ if (task->output.beam) {
+ h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "task_destroy");
+ h2_beam_destroy(task->output.beam);
+ task->output.beam = NULL;
+ }
+
if (task->eor) {
apr_bucket_destroy(task->eor);
}
@@ -542,9 +544,14 @@ void h2_task_destroy(h2_task *task)
apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id)
{
+ conn_rec *c;
+
ap_assert(task);
-
- if (task->c->master) {
+ c = task->c;
+ task->worker_started = 1;
+ task->started_at = apr_time_now();
+
+ if (c->master) {
/* Each conn_rec->id is supposed to be unique at a point in time. Since
* some modules (and maybe external code) uses this id as an identifier
* for the request_rec they handle, it needs to be unique for slave
@@ -562,6 +569,8 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id)
*/
int slave_id, free_bits;
+ task->id = apr_psprintf(task->pool, "%ld-%d", c->master->id,
+ task->stream_id);
if (sizeof(unsigned long) >= 8) {
free_bits = 32;
slave_id = task->stream_id;
@@ -573,12 +582,31 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id)
free_bits = 8;
slave_id = worker_id;
}
- task->c->id = (task->c->master->id << free_bits)^slave_id;
+ task->c->id = (c->master->id << free_bits)^slave_id;
+ c->keepalive = AP_CONN_KEEPALIVE;
+ }
+
+ h2_beam_create(&task->output.beam, c->pool, task->stream_id, "output",
+ H2_BEAM_OWNER_SEND, 0, task->timeout);
+ if (!task->output.beam) {
+ return APR_ENOMEM;
}
- task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc);
+ h2_beam_buffer_size_set(task->output.beam, task->output.max_buffer);
+ h2_beam_send_from(task->output.beam, task->pool);
+
+ h2_ctx_create_for(c, task);
+ apr_table_setn(c->notes, H2_TASK_ID_NOTE, task->id);
+
+ if (task->input.beam) {
+ h2_beam_mutex_enable(task->input.beam);
+ }
+
+ h2_slave_run_pre_connection(c, ap_get_conn_socket(c));
+
+ task->input.bb = apr_brigade_create(task->pool, c->bucket_alloc);
if (task->request->serialize) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): serialize request %s %s",
task->id, task->request->method, task->request->path);
apr_brigade_printf(task->input.bb, NULL,
@@ -588,20 +616,21 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id)
apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n");
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): process connection", task->id);
+
task->c->current_thread = thread;
- ap_run_process_connection(task->c);
+ ap_run_process_connection(c);
if (task->frozen) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): process_conn returned frozen task",
task->id);
/* cleanup delayed */
return APR_EAGAIN;
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): processing done", task->id);
return output_finish(task);
}
diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h
index b2aaf80777..a0875574ec 100644
--- a/modules/http2/h2_task.h
+++ b/modules/http2/h2_task.h
@@ -73,6 +73,7 @@ struct h2_task {
unsigned int copy_files : 1;
struct h2_response_parser *rparser;
apr_bucket_brigade *bb;
+ apr_size_t max_buffer;
} output;
struct h2_mplx *mplx;
@@ -91,7 +92,11 @@ struct h2_task {
struct h2_req_engine *assigned; /* engine that task has been assigned to */
};
-h2_task *h2_task_create(struct h2_stream *stream, conn_rec *slave);
+h2_task *h2_task_create(conn_rec *slave, int stream_id,
+ const h2_request *req, struct h2_mplx *m,
+ struct h2_bucket_beam *input,
+ apr_interval_time_t timeout,
+ apr_size_t output_max_mem);
void h2_task_destroy(h2_task *task);