summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES8
-rw-r--r--CMakeLists.txt2
-rw-r--r--modules/http2/NWGNUmod_http22
-rw-r--r--modules/http2/config2.m42
-rw-r--r--modules/http2/h2.h30
-rw-r--r--modules/http2/h2_bucket_beam.c33
-rw-r--r--modules/http2/h2_bucket_beam.h4
-rw-r--r--modules/http2/h2_conn.c37
-rw-r--r--modules/http2/h2_conn.h4
-rw-r--r--modules/http2/h2_conn_io.c4
-rw-r--r--modules/http2/h2_filter.c219
-rw-r--r--modules/http2/h2_filter.h30
-rw-r--r--modules/http2/h2_from_h1.c541
-rw-r--r--modules/http2/h2_from_h1.h44
-rw-r--r--modules/http2/h2_h2.c34
-rw-r--r--modules/http2/h2_headers.c161
-rw-r--r--modules/http2/h2_headers.h70
-rw-r--r--modules/http2/h2_mplx.c229
-rw-r--r--modules/http2/h2_mplx.h21
-rw-r--r--modules/http2/h2_ngn_shed.c34
-rw-r--r--modules/http2/h2_ngn_shed.h12
-rw-r--r--modules/http2/h2_proxy_util.c1
-rw-r--r--modules/http2/h2_push.c21
-rw-r--r--modules/http2/h2_push.h7
-rw-r--r--modules/http2/h2_request.c96
-rw-r--r--modules/http2/h2_request.h4
-rw-r--r--modules/http2/h2_response.c219
-rw-r--r--modules/http2/h2_response.h76
-rw-r--r--modules/http2/h2_session.c225
-rw-r--r--modules/http2/h2_session.h6
-rw-r--r--modules/http2/h2_stream.c358
-rw-r--r--modules/http2/h2_stream.h69
-rw-r--r--modules/http2/h2_task.c388
-rw-r--r--modules/http2/h2_task.h24
-rw-r--r--modules/http2/h2_util.c23
-rw-r--r--modules/http2/h2_util.h13
-rw-r--r--modules/http2/h2_version.h4
-rw-r--r--modules/http2/mod_http2.c12
-rw-r--r--modules/http2/mod_http2.dsp2
39 files changed, 1443 insertions, 1626 deletions
diff --git a/CHANGES b/CHANGES
index 650a3657dd..381adf3f28 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,14 @@
-*- coding: utf-8 -*-
Changes with Apache 2.5.0
+ *) mod_http2: rewrite of how responses and trailers are transferred between
+ master and slave connection. Reduction of internal states for tasks
+ and streams, stability. Heuristic id generation for slave connections
+ to better keep promise of connection ids unique at given point int time.
+ Fix for mod_cgid interop in high load situtations.
+ Fix for handling of incoming trailers when no request body is sent.
+ [Stefan Eissing]
+
*) event: Avoid listener periodic wake ups by using the pollset wake-ability
when available. PR 57399. [Yann Ylavic, Luca Toscano]
diff --git a/CMakeLists.txt b/CMakeLists.txt
index bfd40cb869..446c1f4e71 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -434,7 +434,7 @@ SET(mod_http2_extra_sources
modules/http2/h2_from_h1.c modules/http2/h2_h2.c
modules/http2/h2_bucket_beam.c
modules/http2/h2_mplx.c modules/http2/h2_push.c
- modules/http2/h2_request.c modules/http2/h2_response.c
+ modules/http2/h2_request.c modules/http2/h2_headers.c
modules/http2/h2_session.c modules/http2/h2_stream.c
modules/http2/h2_switch.c modules/http2/h2_ngn_shed.c
modules/http2/h2_task.c modules/http2/h2_util.c
diff --git a/modules/http2/NWGNUmod_http2 b/modules/http2/NWGNUmod_http2
index 40f7181f25..2360e40f56 100644
--- a/modules/http2/NWGNUmod_http2
+++ b/modules/http2/NWGNUmod_http2
@@ -199,7 +199,7 @@ FILES_nlm_objs = \
$(OBJDIR)/h2_ngn_shed.o \
$(OBJDIR)/h2_push.o \
$(OBJDIR)/h2_request.o \
- $(OBJDIR)/h2_response.o \
+ $(OBJDIR)/h2_headers.o \
$(OBJDIR)/h2_session.o \
$(OBJDIR)/h2_stream.o \
$(OBJDIR)/h2_switch.o \
diff --git a/modules/http2/config2.m4 b/modules/http2/config2.m4
index cccbbc8536..ceb183533d 100644
--- a/modules/http2/config2.m4
+++ b/modules/http2/config2.m4
@@ -30,11 +30,11 @@ h2_ctx.lo dnl
h2_filter.lo dnl
h2_from_h1.lo dnl
h2_h2.lo dnl
+h2_headers.lo dnl
h2_mplx.lo dnl
h2_ngn_shed.lo dnl
h2_push.lo dnl
h2_request.lo dnl
-h2_response.lo dnl
h2_session.lo dnl
h2_stream.lo dnl
h2_switch.lo dnl
diff --git a/modules/http2/h2.h b/modules/http2/h2.h
index b8eac1046c..03e6e3e623 100644
--- a/modules/http2/h2.h
+++ b/modules/http2/h2.h
@@ -47,6 +47,9 @@ extern const char *H2_MAGIC_TOKEN;
#define H2_HEADER_PATH_LEN 5
#define H2_CRLF "\r\n"
+/* Max data size to write so it fits inside a TLS record */
+#define H2_DATA_CHUNK_SIZE ((16*1024) - 100 - 9)
+
/* Maximum number of padding bytes in a frame, rfc7540 */
#define H2_MAX_PADLEN 256
/* Initial default window size, RFC 7540 ch. 6.5.2 */
@@ -115,38 +118,25 @@ typedef struct h2_session_props {
typedef struct h2_request h2_request;
struct h2_request {
- apr_uint32_t id; /* stream id */
- apr_uint32_t initiated_on; /* initiating stream id (PUSH) or 0 */
-
const char *method; /* pseudo header values, see ch. 8.1.2.3 */
const char *scheme;
const char *authority;
const char *path;
apr_table_t *headers;
- apr_table_t *trailers;
apr_time_t request_time;
- apr_off_t content_length;
- unsigned int chunked : 1; /* iff requst body needs to be forwarded as chunked */
- unsigned int body : 1; /* iff this request has a body */
+ unsigned int chunked : 1; /* iff requst body needs to be forwarded as chunked */
unsigned int serialize : 1; /* iff this request is written in HTTP/1.1 serialization */
- unsigned int push_policy; /* which push policy to use for this request */
};
-typedef struct h2_response h2_response;
+typedef struct h2_headers h2_headers;
-struct h2_response {
- int stream_id;
- int rst_error;
- int http_status;
- apr_off_t content_length;
+struct h2_headers {
+ int status;
apr_table_t *headers;
- apr_table_t *trailers;
- struct h2_response *next;
-
- const char *sos_filter;
+ apr_table_t *notes;
};
typedef apr_status_t h2_io_data_cb(void *ctx, const char *data, apr_off_t len);
@@ -155,7 +145,7 @@ typedef int h2_stream_pri_cmp(int stream_id1, int stream_id2, void *ctx);
/* Note key to attach connection task id to conn_rec/request_rec instances */
-#define H2_TASK_ID_NOTE "http2-task-id"
-
+#define H2_TASK_ID_NOTE "http2-task-id"
+#define H2_FILTER_DEBUG_NOTE "http2-debug"
#endif /* defined(__mod_h2__h2__) */
diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c
index 01347363ee..7860d26170 100644
--- a/modules/http2/h2_bucket_beam.c
+++ b/modules/http2/h2_bucket_beam.c
@@ -21,6 +21,7 @@
#include <apr_thread_cond.h>
#include <httpd.h>
+#include <http_protocol.h>
#include <http_log.h>
#include "h2_private.h"
@@ -170,6 +171,14 @@ const apr_bucket_type_t h2_bucket_type_beam = {
* h2_blist, a brigade without allocations
******************************************************************************/
+APR_HOOK_STRUCT(
+ APR_HOOK_LINK(beam_bucket)
+)
+AP_IMPLEMENT_HOOK_RUN_FIRST(apr_bucket *, beam_bucket,
+ (h2_bucket_beam *beam, apr_bucket_brigade *dest,
+ const apr_bucket *src),
+ (beam, dest, src), NULL)
+
apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
const char *tag, const char *sep,
h2_blist *bl)
@@ -518,10 +527,12 @@ void h2_beam_abort(h2_bucket_beam *beam)
h2_beam_lock bl;
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- r_purge_reds(beam);
- h2_blist_cleanup(&beam->red);
- beam->aborted = 1;
- report_consumption(beam, 0);
+ if (!beam->aborted) {
+ beam->aborted = 1;
+ r_purge_reds(beam);
+ h2_blist_cleanup(&beam->red);
+ report_consumption(beam, 0);
+ }
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
}
@@ -792,8 +803,10 @@ transfer:
else if (APR_BUCKET_IS_FLUSH(bred)) {
bgreen = apr_bucket_flush_create(bb->bucket_alloc);
}
- else {
- /* put red into hold, no green sent out */
+ else if (AP_BUCKET_IS_ERROR(bred)) {
+ ap_bucket_error *eb = (ap_bucket_error *)bred;
+ bgreen = ap_bucket_error_create(eb->status, eb->data,
+ bb->p, bb->bucket_alloc);
}
}
else if (APR_BUCKET_IS_FILE(bred)) {
@@ -846,6 +859,14 @@ transfer:
remain -= bgreen->length;
++transferred;
}
+ else {
+ bgreen = ap_run_beam_bucket(beam, bb, bred);
+ while (bgreen && bgreen != APR_BRIGADE_SENTINEL(bb)) {
+ ++transferred;
+ remain -= bgreen->length;
+ bgreen = APR_BUCKET_NEXT(bgreen);
+ }
+ }
}
if (readbytes > 0 && remain < 0) {
diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h
index 1e486e9a24..db46baacd6 100644
--- a/modules/http2/h2_bucket_beam.h
+++ b/modules/http2/h2_bucket_beam.h
@@ -373,4 +373,8 @@ int h2_beam_was_received(h2_bucket_beam *beam);
apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam);
+AP_DECLARE_HOOK(apr_bucket *, beam_bucket,
+ (h2_bucket_beam *beam, apr_bucket_brigade *dest,
+ const apr_bucket *src))
+
#endif /* h2_bucket_beam_h */
diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c
index 4ddf1b7029..c2e8e0ef34 100644
--- a/modules/http2/h2_conn.c
+++ b/modules/http2/h2_conn.c
@@ -14,6 +14,7 @@
*/
#include <assert.h>
+#include <apr_strings.h>
#include <ap_mpm.h>
@@ -240,12 +241,13 @@ apr_status_t h2_conn_pre_close(struct h2_ctx *ctx, conn_rec *c)
return status;
}
-conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent,
- apr_allocator_t *allocator)
+conn_rec *h2_slave_create(conn_rec *master, apr_uint32_t slave_id,
+ apr_pool_t *parent, apr_allocator_t *allocator)
{
apr_pool_t *pool;
conn_rec *c;
void *cfg;
+ unsigned long l;
AP_DEBUG_ASSERT(master);
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, master,
@@ -271,8 +273,29 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent,
}
memcpy(c, master, sizeof(conn_rec));
-
- /* Replace these */
+
+ /* Each conn_rec->id is supposed to be unique at a point in time. Since
+ * some modules (and maybe external code) uses this id as an identifier
+ * for the request_rec they handle, it needs to be unique for slave
+ * connections also.
+ * The connection id is generated by the MPM and most MPMs use the formula
+ * id := (child_num * max_threads) + thread_num
+ * which means that there is a maximum id of about
+ * idmax := max_child_count * max_threads
+ * If we assume 2024 child processes with 2048 threads max, we get
+ * idmax ~= 2024 * 2048 = 2 ** 22
+ * On 32 bit systems, we have not much space left, but on 64 bit systems
+ * (and higher?) we can use the upper 32 bits without fear of collision.
+ * 32 bits is just what we need, since a connection can only handle so
+ * many streams.
+ */
+ l = master->id;
+ if (sizeof(long) >= 8 && l < APR_UINT32_MAX) {
+ c->id = l|(((unsigned long)slave_id) << 32);
+ }
+ else {
+ c->id = l^(~slave_id);
+ }
c->master = master;
c->pool = pool;
c->conn_config = ap_create_conn_config(pool);
@@ -284,7 +307,8 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent,
c->data_in_output_filters = 0;
c->clogging_input_filters = 1;
c->log = NULL;
- c->log_id = NULL;
+ c->log_id = apr_psprintf(pool, "%ld-%d",
+ master->id, slave_id);
/* Simulate that we had already a request on this connection. */
c->keepalives = 1;
/* We cannot install the master connection socket on the slaves, as
@@ -304,6 +328,9 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent,
ap_set_module_config(c->conn_config, h2_conn_mpm_module(), cfg);
}
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
+ "h2_task: creating conn, master=%ld, sid=%ld, logid=%s",
+ master->id, c->id, c->log_id);
return c;
}
diff --git a/modules/http2/h2_conn.h b/modules/http2/h2_conn.h
index 84f7616858..4c2799696a 100644
--- a/modules/http2/h2_conn.h
+++ b/modules/http2/h2_conn.h
@@ -66,8 +66,8 @@ typedef enum {
h2_mpm_type_t h2_conn_mpm_type(void);
-conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent,
- apr_allocator_t *allocator);
+conn_rec *h2_slave_create(conn_rec *master, apr_uint32_t slave_id,
+ apr_pool_t *parent, apr_allocator_t *allocator);
void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator);
apr_status_t h2_slave_run_pre_connection(conn_rec *slave, apr_socket_t *csd);
diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c
index d72cbe4967..6ba24faa6f 100644
--- a/modules/http2/h2_conn_io.c
+++ b/modules/http2/h2_conn_io.c
@@ -120,8 +120,8 @@ static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level,
line = *buffer? buffer : "(empty)";
}
/* Intentional no APLOGNO */
- ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d)-%s: %s",
- c->id, stream_id, tag, line);
+ ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%s)-%s: %s",
+ c->log_id, tag, line);
}
diff --git a/modules/http2/h2_filter.c b/modules/http2/h2_filter.c
index ce94b52ed6..6d9cee16f5 100644
--- a/modules/http2/h2_filter.c
+++ b/modules/http2/h2_filter.c
@@ -18,6 +18,7 @@
#include <apr_strings.h>
#include <httpd.h>
#include <http_core.h>
+#include <http_protocol.h>
#include <http_log.h>
#include <http_connection.h>
#include <scoreboard.h>
@@ -32,7 +33,7 @@
#include "h2_task.h"
#include "h2_stream.h"
#include "h2_request.h"
-#include "h2_response.h"
+#include "h2_headers.h"
#include "h2_stream.h"
#include "h2_session.h"
#include "h2_util.h"
@@ -174,30 +175,92 @@ apr_status_t h2_filter_core_input(ap_filter_t* f,
* http2 connection status handler + stream out source
******************************************************************************/
-static const char *H2_SOS_H2_STATUS = "http2-status";
+typedef struct {
+ apr_bucket_refcount refcount;
+ h2_bucket_event_cb *cb;
+ void *ctx;
+} h2_bucket_observer;
+
+static apr_status_t bucket_read(apr_bucket *b, const char **str,
+ apr_size_t *len, apr_read_type_e block)
+{
+ (void)b;
+ (void)block;
+ *str = NULL;
+ *len = 0;
+ return APR_SUCCESS;
+}
-int h2_filter_h2_status_handler(request_rec *r)
+static void bucket_destroy(void *data)
{
- h2_ctx *ctx = h2_ctx_rget(r);
- h2_task *task;
-
- if (strcmp(r->handler, "http2-status")) {
- return DECLINED;
+ h2_bucket_observer *h = data;
+ if (apr_bucket_shared_destroy(h)) {
+ if (h->cb) {
+ h->cb(h->ctx, H2_BUCKET_EV_BEFORE_DESTROY, NULL);
+ }
+ apr_bucket_free(h);
}
- if (r->method_number != M_GET) {
- return DECLINED;
+}
+
+apr_bucket * h2_bucket_observer_make(apr_bucket *b, h2_bucket_event_cb *cb,
+ void *ctx)
+{
+ h2_bucket_observer *br;
+
+ br = apr_bucket_alloc(sizeof(*br), b->list);
+ br->cb = cb;
+ br->ctx = ctx;
+
+ b = apr_bucket_shared_make(b, br, 0, 0);
+ b->type = &h2_bucket_type_observer;
+ return b;
+}
+
+apr_bucket * h2_bucket_observer_create(apr_bucket_alloc_t *list,
+ h2_bucket_event_cb *cb, void *ctx)
+{
+ apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
+
+ APR_BUCKET_INIT(b);
+ b->free = apr_bucket_free;
+ b->list = list;
+ b = h2_bucket_observer_make(b, cb, ctx);
+ return b;
+}
+
+apr_status_t h2_bucket_observer_fire(apr_bucket *b, h2_bucket_event event)
+{
+ if (H2_BUCKET_IS_OBSERVER(b)) {
+ h2_bucket_observer *l = (h2_bucket_observer *)b->data;
+ return l->cb(l->ctx, event, b);
}
+ return APR_EINVAL;
+}
- task = ctx? h2_ctx_get_task(ctx) : NULL;
- if (task) {
- /* We need to handle the actual output on the main thread, as
- * we need to access h2_session information. */
- apr_table_setn(r->notes, H2_RESP_SOS_NOTE, H2_SOS_H2_STATUS);
- apr_table_setn(r->headers_out, "Content-Type", "application/json");
- r->status = 200;
- return DONE;
+const apr_bucket_type_t h2_bucket_type_observer = {
+ "H2LAZY", 5, APR_BUCKET_METADATA,
+ bucket_destroy,
+ bucket_read,
+ apr_bucket_setaside_noop,
+ apr_bucket_split_notimpl,
+ apr_bucket_shared_copy
+};
+
+apr_bucket *h2_bucket_observer_beam(struct h2_bucket_beam *beam,
+ apr_bucket_brigade *dest,
+ const apr_bucket *src)
+{
+ if (H2_BUCKET_IS_OBSERVER(src)) {
+ h2_bucket_observer *l = (h2_bucket_observer *)src->data;
+ apr_bucket *b = h2_bucket_observer_create(dest->bucket_alloc,
+ l->cb, l->ctx);
+ APR_BRIGADE_INSERT_TAIL(dest, b);
+ l->cb = NULL;
+ l->ctx = NULL;
+ h2_bucket_observer_fire(b, H2_BUCKET_EV_BEFORE_MASTER_SEND);
+ return b;
}
- return DECLINED;
+ return NULL;
}
static apr_status_t bbout(apr_bucket_brigade *bb, const char *fmt, ...)
@@ -337,31 +400,28 @@ static void add_stats(apr_bucket_brigade *bb, h2_session *s,
bbout(bb, " }%s\n", last? "" : ",");
}
-static apr_status_t h2_status_stream_filter(h2_stream *stream)
+static apr_status_t h2_status_insert(h2_task *task, apr_bucket *b)
{
- h2_session *s = stream->session;
- conn_rec *c = s->c;
+ h2_mplx *m = task->mplx;
+ h2_stream *stream = h2_mplx_stream_get(m, task->stream_id);
+ h2_session *s;
+ conn_rec *c;
+
apr_bucket_brigade *bb;
+ apr_bucket *e;
int32_t connFlowIn, connFlowOut;
- if (!stream->response) {
- return APR_EINVAL;
- }
-
- if (!stream->buffer) {
- stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
+ if (!stream) {
+ /* stream already done */
+ return APR_SUCCESS;
}
- bb = stream->buffer;
+ s = stream->session;
+ c = s->c;
- apr_table_unset(stream->response->headers, "Content-Length");
- stream->response->content_length = -1;
+ bb = apr_brigade_create(stream->pool, c->bucket_alloc);
connFlowIn = nghttp2_session_get_effective_local_window_size(s->ngh2);
connFlowOut = nghttp2_session_get_remote_window_size(s->ngh2);
- apr_table_setn(stream->response->headers, "conn-flow-in",
- apr_itoa(stream->pool, connFlowIn));
- apr_table_setn(stream->response->headers, "conn-flow-out",
- apr_itoa(stream->pool, connFlowOut));
bbout(bb, "{\n");
bbout(bb, " \"version\": \"draft-01\",\n");
@@ -376,15 +436,96 @@ static apr_status_t h2_status_stream_filter(h2_stream *stream)
add_stats(bb, s, stream, 1);
bbout(bb, "}\n");
+ while ((e = APR_BRIGADE_FIRST(bb)) != APR_BRIGADE_SENTINEL(bb)) {
+ APR_BUCKET_REMOVE(e);
+ APR_BUCKET_INSERT_AFTER(b, e);
+ b = e;
+ }
+ apr_brigade_destroy(bb);
+
return APR_SUCCESS;
}
-apr_status_t h2_stream_filter(h2_stream *stream)
+static apr_status_t status_event(void *ctx, h2_bucket_event event,
+ apr_bucket *b)
{
- const char *fname = stream->response? stream->response->sos_filter : NULL;
- if (fname && !strcmp(H2_SOS_H2_STATUS, fname)) {
- return h2_status_stream_filter(stream);
+ h2_task *task = ctx;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, task->c->master,
+ "status_event(%s): %d", task->id, event);
+ switch (event) {
+ case H2_BUCKET_EV_BEFORE_MASTER_SEND:
+ h2_status_insert(task, b);
+ break;
+ default:
+ break;
}
return APR_SUCCESS;
}
+int h2_filter_h2_status_handler(request_rec *r)
+{
+ h2_ctx *ctx = h2_ctx_rget(r);
+ conn_rec *c = r->connection;
+ h2_task *task;
+ apr_bucket_brigade *bb;
+ apr_bucket *b;
+ apr_status_t status;
+
+ if (strcmp(r->handler, "http2-status")) {
+ return DECLINED;
+ }
+ if (r->method_number != M_GET && r->method_number != M_POST) {
+ return DECLINED;
+ }
+
+ task = ctx? h2_ctx_get_task(ctx) : NULL;
+ if (task) {
+
+ if ((status = ap_discard_request_body(r)) != OK) {
+ return status;
+ }
+
+ /* We need to handle the actual output on the main thread, as
+ * we need to access h2_session information. */
+ r->status = 200;
+ r->clength = -1;
+ r->chunked = 1;
+ apr_table_unset(r->headers_out, "Content-Length");
+ ap_set_content_type(r, "application/json");
+ apr_table_setn(r->notes, H2_FILTER_DEBUG_NOTE, "on");
+
+ bb = apr_brigade_create(r->pool, c->bucket_alloc);
+ b = h2_bucket_observer_create(c->bucket_alloc, status_event, task);
+ APR_BRIGADE_INSERT_TAIL(bb, b);
+ b = apr_bucket_eos_create(c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(bb, b);
+
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+ "status_handler(%s): checking for incoming trailers",
+ task->id);
+ if (r->trailers_in && !apr_is_empty_table(r->trailers_in)) {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+ "status_handler(%s): seeing incoming trailers",
+ task->id);
+ apr_table_setn(r->trailers_out, "h2-trailers-in",
+ apr_itoa(r->pool, 1));
+ }
+
+ status = ap_pass_brigade(r->output_filters, bb);
+ if (status == APR_SUCCESS
+ || r->status != HTTP_OK
+ || c->aborted) {
+ return OK;
+ }
+ else {
+ /* no way to know what type of error occurred */
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+ "status_handler(%s): ap_pass_brigade failed",
+ task->id);
+ return AP_FILTER_ERROR;
+ }
+ }
+ return DECLINED;
+}
+
diff --git a/modules/http2/h2_filter.h b/modules/http2/h2_filter.h
index 5ba7d1581b..b3e34cc5ba 100644
--- a/modules/http2/h2_filter.h
+++ b/modules/http2/h2_filter.h
@@ -16,6 +16,8 @@
#ifndef __mod_h2__h2_filter__
#define __mod_h2__h2_filter__
+struct h2_bucket_beam;
+struct h2_headers;
struct h2_stream;
struct h2_session;
@@ -43,9 +45,33 @@ apr_status_t h2_filter_core_input(ap_filter_t* filter,
apr_read_type_e block,
apr_off_t readbytes);
-#define H2_RESP_SOS_NOTE "h2-sos-filter"
+/******* observer bucket ******************************************************/
+
+typedef enum {
+ H2_BUCKET_EV_BEFORE_DESTROY,
+ H2_BUCKET_EV_BEFORE_MASTER_SEND
+} h2_bucket_event;
+
+extern const apr_bucket_type_t h2_bucket_type_observer;
+
+typedef apr_status_t h2_bucket_event_cb(void *ctx, h2_bucket_event event, apr_bucket *b);
+
+#define H2_BUCKET_IS_OBSERVER(e) (e->type == &h2_bucket_type_observer)
+
+apr_bucket * h2_bucket_observer_make(apr_bucket *b, h2_bucket_event_cb *cb,
+ void *ctx);
+
+apr_bucket * h2_bucket_observer_create(apr_bucket_alloc_t *list,
+ h2_bucket_event_cb *cb, void *ctx);
+
+apr_status_t h2_bucket_observer_fire(apr_bucket *b, h2_bucket_event event);
+
+apr_bucket *h2_bucket_observer_beam(struct h2_bucket_beam *beam,
+ apr_bucket_brigade *dest,
+ const apr_bucket *src);
+
+/******* /.well-known/h2/state handler ****************************************/
-apr_status_t h2_stream_filter(struct h2_stream *stream);
int h2_filter_h2_status_handler(request_rec *r);
#endif /* __mod_h2__h2_filter__ */
diff --git a/modules/http2/h2_from_h1.c b/modules/http2/h2_from_h1.c
index 876ec58bfb..b7429dc7d4 100644
--- a/modules/http2/h2_from_h1.c
+++ b/modules/http2/h2_from_h1.c
@@ -28,190 +28,12 @@
#include <util_time.h>
#include "h2_private.h"
-#include "h2_response.h"
+#include "h2_headers.h"
#include "h2_from_h1.h"
#include "h2_task.h"
#include "h2_util.h"
-static void set_state(h2_from_h1 *from_h1, h2_from_h1_state_t state);
-
-h2_from_h1 *h2_from_h1_create(int stream_id, apr_pool_t *pool)
-{
- h2_from_h1 *from_h1 = apr_pcalloc(pool, sizeof(h2_from_h1));
- if (from_h1) {
- from_h1->stream_id = stream_id;
- from_h1->pool = pool;
- from_h1->state = H2_RESP_ST_STATUS_LINE;
- from_h1->hlines = apr_array_make(pool, 10, sizeof(char *));
- }
- return from_h1;
-}
-
-static void set_state(h2_from_h1 *from_h1, h2_from_h1_state_t state)
-{
- if (from_h1->state != state) {
- from_h1->state = state;
- }
-}
-
-h2_response *h2_from_h1_get_response(h2_from_h1 *from_h1)
-{
- return from_h1->response;
-}
-
-static apr_status_t make_h2_headers(h2_from_h1 *from_h1, request_rec *r)
-{
- from_h1->response = h2_response_create(from_h1->stream_id, 0,
- from_h1->http_status,
- from_h1->hlines,
- r->notes,
- from_h1->pool);
- from_h1->content_length = from_h1->response->content_length;
- from_h1->chunked = r->chunked;
-
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection, APLOGNO(03197)
- "h2_from_h1(%d): converted headers, content-length: %d"
- ", chunked=%d",
- from_h1->stream_id, (int)from_h1->content_length,
- (int)from_h1->chunked);
-
- set_state(from_h1, ((from_h1->chunked || from_h1->content_length > 0)?
- H2_RESP_ST_BODY : H2_RESP_ST_DONE));
- /* We are ready to be sent to the client */
- return APR_SUCCESS;
-}
-
-static apr_status_t parse_header(h2_from_h1 *from_h1, ap_filter_t* f,
- char *line) {
- (void)f;
-
- if (line[0] == ' ' || line[0] == '\t') {
- char **plast;
- /* continuation line from the header before this */
- while (line[0] == ' ' || line[0] == '\t') {
- ++line;
- }
-
- plast = apr_array_pop(from_h1->hlines);
- if (plast == NULL) {
- /* not well formed */
- return APR_EINVAL;
- }
- APR_ARRAY_PUSH(from_h1->hlines, const char*) = apr_psprintf(from_h1->pool, "%s %s", *plast, line);
- }
- else {
- /* new header line */
- APR_ARRAY_PUSH(from_h1->hlines, const char*) = apr_pstrdup(from_h1->pool, line);
- }
- return APR_SUCCESS;
-}
-
-static apr_status_t get_line(h2_from_h1 *from_h1, apr_bucket_brigade *bb,
- ap_filter_t* f, char *line, apr_size_t len)
-{
- apr_status_t status;
- if (!from_h1->bb) {
- from_h1->bb = apr_brigade_create(from_h1->pool, f->c->bucket_alloc);
- }
- else {
- apr_brigade_cleanup(from_h1->bb);
- }
- status = apr_brigade_split_line(from_h1->bb, bb,
- APR_BLOCK_READ,
- HUGE_STRING_LEN);
- if (status == APR_SUCCESS) {
- --len;
- status = apr_brigade_flatten(from_h1->bb, line, &len);
- if (status == APR_SUCCESS) {
- /* we assume a non-0 containing line and remove
- * trailing crlf. */
- line[len] = '\0';
- if (len >= 2 && !strcmp(H2_CRLF, line + len - 2)) {
- len -= 2;
- line[len] = '\0';
- }
-
- apr_brigade_cleanup(from_h1->bb);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_from_h1(%d): read line: %s",
- from_h1->stream_id, line);
- }
- }
- return status;
-}
-
-apr_status_t h2_from_h1_read_response(h2_from_h1 *from_h1, ap_filter_t* f,
- apr_bucket_brigade* bb)
-{
- apr_status_t status = APR_SUCCESS;
- char line[HUGE_STRING_LEN];
-
- if ((from_h1->state == H2_RESP_ST_BODY)
- || (from_h1->state == H2_RESP_ST_DONE)) {
- if (from_h1->chunked) {
- /* The httpd core HTTP_HEADER filter has or will install the
- * "CHUNK" output transcode filter, which appears further down
- * the filter chain. We do not want it for HTTP/2.
- * Once we successfully deinstalled it, this filter has no
- * further function and we remove it.
- */
- status = ap_remove_output_filter_byhandle(f->r->output_filters,
- "CHUNK");
- if (status == APR_SUCCESS) {
- ap_remove_output_filter(f);
- }
- }
-
- return ap_pass_brigade(f->next, bb);
- }
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_from_h1(%d): read_response", from_h1->stream_id);
-
- while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
-
- switch (from_h1->state) {
-
- case H2_RESP_ST_STATUS_LINE:
- case H2_RESP_ST_HEADERS:
- status = get_line(from_h1, bb, f, line, sizeof(line));
- if (status != APR_SUCCESS) {
- return status;
- }
- if (from_h1->state == H2_RESP_ST_STATUS_LINE) {
- /* instead of parsing, just take it directly */
- from_h1->http_status = f->r->status;
- from_h1->state = H2_RESP_ST_HEADERS;
- }
- else if (line[0] == '\0') {
- /* end of headers, create the h2_response and
- * pass the rest of the brigade down the filter
- * chain.
- */
- status = make_h2_headers(from_h1, f->r);
- if (from_h1->bb) {
- apr_brigade_destroy(from_h1->bb);
- from_h1->bb = NULL;
- }
- if (!APR_BRIGADE_EMPTY(bb)) {
- return ap_pass_brigade(f->next, bb);
- }
- }
- else {
- status = parse_header(from_h1, f, line);
- }
- break;
-
- default:
- return ap_pass_brigade(f->next, bb);
- }
-
- }
-
- return status;
-}
-
/* This routine is called by apr_table_do and merges all instances of
* the passed field values into a single array that will be further
* processed by some later routine. Originally intended to help split
@@ -345,7 +167,7 @@ static int copy_header(void *ctx, const char *name, const char *value)
return 1;
}
-static h2_response *create_response(h2_from_h1 *from_h1, request_rec *r)
+static h2_headers *create_response(h2_task *task, request_rec *r)
{
const char *clheader;
const char *ctype;
@@ -471,115 +293,316 @@ static h2_response *create_response(h2_from_h1 *from_h1, request_rec *r)
(void *) headers, r->headers_out, NULL);
}
- return h2_response_rcreate(from_h1->stream_id, r, r->status,
- headers, r->pool);
+ return h2_headers_rcreate(r, r->status, headers, r->pool);
}
-apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
+apr_status_t h2_headers_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
{
h2_task *task = f->ctx;
- h2_from_h1 *from_h1 = task->output.from_h1;
request_rec *r = f->r;
- apr_bucket *b;
+ apr_bucket *b, *bresp, *body_bucket = NULL, *next;
ap_bucket_error *eb = NULL;
+ h2_headers *response = NULL;
- AP_DEBUG_ASSERT(from_h1 != NULL);
-
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_from_h1(%d): output_filter called", from_h1->stream_id);
+ "h2_task(%s): output_filter called", task->id);
- if (r->header_only && from_h1->response) {
- /* throw away any data after we have compiled the response */
- apr_brigade_cleanup(bb);
- return OK;
+ if (!task->output.sent_response) {
+ /* check, if we need to send the response now. Until we actually
+ * see a DATA bucket or some EOS/EOR, we do not do so. */
+ for (b = APR_BRIGADE_FIRST(bb);
+ b != APR_BRIGADE_SENTINEL(bb);
+ b = APR_BUCKET_NEXT(b))
+ {
+ if (AP_BUCKET_IS_ERROR(b) && !eb) {
+ eb = b->data;
+ }
+ else if (AP_BUCKET_IS_EOC(b)) {
+ /* If we see an EOC bucket it is a signal that we should get out
+ * of the way doing nothing.
+ */
+ ap_remove_output_filter(f);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, f->c,
+ "h2_task(%s): eoc bucket passed", task->id);
+ return ap_pass_brigade(f->next, bb);
+ }
+ else if (!H2_BUCKET_IS_HEADERS(b) && !APR_BUCKET_IS_FLUSH(b)) {
+ body_bucket = b;
+ break;
+ }
+ }
+
+ if (eb) {
+ int st = eb->status;
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03047)
+ "h2_task(%s): err bucket status=%d", task->id, st);
+ /* throw everything away and replace it with the error response
+ * generated by ap_die() */
+ apr_brigade_cleanup(bb);
+ ap_die(st, r);
+ return AP_FILTER_ERROR;
+ }
+
+ if (body_bucket) {
+ /* time to insert the response bucket before the body */
+ response = create_response(task, r);
+ if (response == NULL) {
+ ap_log_cerror(APLOG_MARK, APLOG_NOTICE, 0, f->c, APLOGNO(03048)
+ "h2_task(%s): unable to create response", task->id);
+ return APR_ENOMEM;
+ }
+
+ bresp = h2_bucket_headers_create(f->c->bucket_alloc, response);
+ APR_BUCKET_INSERT_BEFORE(body_bucket, bresp);
+ /*APR_BRIGADE_INSERT_HEAD(bb, bresp);*/
+ task->output.sent_response = 1;
+ r->sent_bodyct = 1;
+ }
}
- for (b = APR_BRIGADE_FIRST(bb);
- b != APR_BRIGADE_SENTINEL(bb);
- b = APR_BUCKET_NEXT(b))
- {
- if (AP_BUCKET_IS_ERROR(b) && !eb) {
- eb = b->data;
- continue;
+ if (r->header_only) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+ "h2_task(%s): header_only, cleanup output brigade",
+ task->id);
+ b = body_bucket? body_bucket : APR_BRIGADE_FIRST(bb);
+ while (b != APR_BRIGADE_SENTINEL(bb)) {
+ next = APR_BUCKET_NEXT(b);
+ if (APR_BUCKET_IS_EOS(b) || AP_BUCKET_IS_EOR(b)) {
+ break;
+ }
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ b = next;
}
- /*
- * If we see an EOC bucket it is a signal that we should get out
- * of the way doing nothing.
- */
- if (AP_BUCKET_IS_EOC(b)) {
- ap_remove_output_filter(f);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, f->c,
- "h2_from_h1(%d): eoc bucket passed",
- from_h1->stream_id);
- return ap_pass_brigade(f->next, bb);
+ }
+ else if (task->output.sent_response) {
+ /* lets get out of the way, our task is done */
+ ap_remove_output_filter(f);
+ }
+ return ap_pass_brigade(f->next, bb);
+}
+
+static void make_chunk(h2_task *task, apr_bucket_brigade *bb,
+ apr_bucket *first, apr_uint64_t chunk_len,
+ apr_bucket *tail)
+{
+ /* Surround the buckets [first, tail[ with new buckets carrying the
+ * HTTP/1.1 chunked encoding format. If tail is NULL, the chunk extends
+ * to the end of the brigade. */
+ char buffer[128];
+ apr_bucket *c;
+ int len;
+
+ len = apr_snprintf(buffer, H2_ALEN(buffer),
+ "%"APR_UINT64_T_HEX_FMT"\r\n", chunk_len);
+ c = apr_bucket_heap_create(buffer, len, NULL, bb->bucket_alloc);
+ APR_BUCKET_INSERT_BEFORE(first, c);
+ c = apr_bucket_heap_create("\r\n", 2, NULL, bb->bucket_alloc);
+ if (tail) {
+ APR_BUCKET_INSERT_BEFORE(tail, c);
+ }
+ else {
+ APR_BRIGADE_INSERT_TAIL(bb, c);
+ }
+}
+
+static int ser_header(void *ctx, const char *name, const char *value)
+{
+ apr_bucket_brigade *bb = ctx;
+ apr_brigade_printf(bb, NULL, NULL, "%s: %s\r\n", name, value);
+ return 1;
+}
+
+apr_status_t h2_filter_request_in(ap_filter_t* f,
+ apr_bucket_brigade* bb,
+ ap_input_mode_t mode,
+ apr_read_type_e block,
+ apr_off_t readbytes)
+{
+ h2_task *task = f->ctx;
+ request_rec *r = f->r;
+ apr_status_t status = APR_SUCCESS;
+ apr_bucket *b, *next, *first_data = NULL;
+ apr_off_t bblen = 0;
+
+ if (!task->input.chunked) {
+ status = ap_get_brigade(f->next, bb, mode, block, readbytes);
+ /* pipe data through, just take care of trailers */
+ for (b = APR_BRIGADE_FIRST(bb);
+ b != APR_BRIGADE_SENTINEL(bb); b = next) {
+ next = APR_BUCKET_NEXT(b);
+ if (H2_BUCKET_IS_HEADERS(b)) {
+ h2_headers *headers = h2_bucket_headers_get(b);
+ ap_assert(headers);
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+ "h2_task(%s): receiving trailers", task->id);
+ r->trailers_in = apr_table_clone(r->pool, headers->headers);
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ ap_remove_input_filter(f);
+ break;
+ }
}
+ return status;
}
+
+ /* Things are more complicated. The standard HTTP input filter, which
+ * does a lot what we do not want to duplicate, also cares about chunked
+ * transfer encoding and trailers.
+ * We need to simulate chunked encoding for it to be happy.
+ */
- if (eb) {
- int st = eb->status;
- apr_brigade_cleanup(bb);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03047)
- "h2_from_h1(%d): err bucket status=%d",
- from_h1->stream_id, st);
- ap_die(st, r);
- return AP_FILTER_ERROR;
+ if (!task->input.bbchunk) {
+ task->input.bbchunk = apr_brigade_create(r->pool, f->c->bucket_alloc);
}
-
- from_h1->response = create_response(from_h1, r);
- if (from_h1->response == NULL) {
- ap_log_cerror(APLOG_MARK, APLOG_NOTICE, 0, f->c, APLOGNO(03048)
- "h2_from_h1(%d): unable to create response",
- from_h1->stream_id);
- return APR_ENOMEM;
+ if (APR_BRIGADE_EMPTY(task->input.bbchunk)) {
+ /* get more data from the lower layer filters. Always do this
+ * in larger pieces, since we handle the read modes ourself.
+ */
+ status = ap_get_brigade(f->next, task->input.bbchunk,
+ AP_MODE_READBYTES, block, 32*1024);
+ if (status == APR_EOF) {
+ if (!task->input.eos) {
+ status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
+ task->input.eos = 1;
+ return APR_SUCCESS;
+ }
+ ap_remove_input_filter(f);
+ return status;
+
+ }
+ else if (status != APR_SUCCESS) {
+ return status;
+ }
+
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+ "h2_task(%s): trailers_in inspecting brigade", task->id);
+ for (b = APR_BRIGADE_FIRST(task->input.bbchunk);
+ b != APR_BRIGADE_SENTINEL(task->input.bbchunk) && !task->input.eos;
+ b = next) {
+ next = APR_BUCKET_NEXT(b);
+ if (APR_BUCKET_IS_METADATA(b)) {
+ if (first_data) {
+ make_chunk(task, task->input.bbchunk, first_data, bblen, b);
+ first_data = NULL;
+ bblen = 0;
+ }
+
+ if (H2_BUCKET_IS_HEADERS(b)) {
+ apr_bucket_brigade *tmp;
+ h2_headers *headers = h2_bucket_headers_get(b);
+
+ ap_assert(headers);
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+ "h2_task(%s): receiving trailers", task->id);
+ tmp = apr_brigade_split_ex(task->input.bbchunk, b, NULL);
+ if (!apr_is_empty_table(headers->headers)) {
+ status = apr_brigade_puts(task->input.bbchunk, NULL, NULL, "0\r\n");
+ apr_table_do(ser_header, task->input.bbchunk, headers->headers, NULL);
+ status = apr_brigade_puts(task->input.bbchunk, NULL, NULL, "\r\n");
+ }
+ else {
+ status = apr_brigade_puts(task->input.bbchunk, NULL, NULL, "0\r\n\r\n");
+ }
+ APR_BRIGADE_CONCAT(task->input.bbchunk, tmp);
+ apr_brigade_destroy(tmp);
+ r->trailers_in = apr_table_clone(r->pool, headers->headers);
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ task->input.eos = 1;
+ }
+ else if (APR_BUCKET_IS_EOS(b)) {
+ apr_bucket_brigade *tmp = apr_brigade_split_ex(task->input.bbchunk, b, NULL);
+ status = apr_brigade_puts(task->input.bbchunk, NULL, NULL, "0\r\n\r\n");
+ APR_BRIGADE_CONCAT(task->input.bbchunk, tmp);
+ apr_brigade_destroy(tmp);
+ task->input.eos = 1;
+ }
+ break;
+ }
+ else if (b->length == 0) {
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ }
+ else {
+ if (!first_data) {
+ first_data = b;
+ }
+ bblen += b->length;
+ }
+ }
+
+ if (first_data) {
+ make_chunk(task, task->input.bbchunk, first_data, bblen, NULL);
+ }
+ }
+
+ if (mode == AP_MODE_EXHAUSTIVE) {
+ /* return all we have */
+ APR_BRIGADE_CONCAT(bb, task->input.bbchunk);
+ }
+ else if (mode == AP_MODE_READBYTES) {
+ status = h2_brigade_concat_length(bb, task->input.bbchunk, readbytes);
+ }
+ else if (mode == AP_MODE_SPECULATIVE) {
+ status = h2_brigade_copy_length(bb, task->input.bbchunk, readbytes);
+ }
+ else if (mode == AP_MODE_GETLINE) {
+ /* we are reading a single LF line, e.g. the HTTP headers.
+ * this has the nasty side effect to split the bucket, even
+ * though it ends with CRLF and creates a 0 length bucket */
+ status = apr_brigade_split_line(bb, task->input.bbchunk, block,
+ HUGE_STRING_LEN);
+ if (APLOGctrace1(f->c)) {
+ char buffer[1024];
+ apr_size_t len = sizeof(buffer)-1;
+ apr_brigade_flatten(bb, buffer, &len);
+ buffer[len] = 0;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
+ "h2_task(%s): getline: %s",
+ task->id, buffer);
+ }
}
-
- if (r->header_only) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_from_h1(%d): header_only, cleanup output brigade",
- from_h1->stream_id);
- apr_brigade_cleanup(bb);
- return OK;
+ else {
+ /* Hmm, well. There is mode AP_MODE_EATCRLF, but we chose not
+ * to support it. Seems to work. */
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOTIMPL, f->c,
+ APLOGNO(02942)
+ "h2_task, unsupported READ mode %d", mode);
+ status = APR_ENOTIMPL;
}
- r->sent_bodyct = 1; /* Whatever follows is real body stuff... */
-
- ap_remove_output_filter(f);
- if (APLOGctrace1(f->c)) {
- apr_off_t len = 0;
- apr_brigade_length(bb, 0, &len);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_from_h1(%d): removed header filter, passing brigade "
- "len=%ld", from_h1->stream_id, (long)len);
- }
- return ap_pass_brigade(f->next, bb);
+ h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, "forwarding input", bb);
+ return status;
}
-apr_status_t h2_response_trailers_filter(ap_filter_t *f, apr_bucket_brigade *bb)
+apr_status_t h2_filter_trailers_out(ap_filter_t *f, apr_bucket_brigade *bb)
{
h2_task *task = f->ctx;
- h2_from_h1 *from_h1 = task->output.from_h1;
request_rec *r = f->r;
- apr_bucket *b;
+ apr_bucket *b, *e;
- if (from_h1 && from_h1->response) {
- /* Detect the EOR bucket and forward any trailers that may have
- * been set to our h2_response.
+ if (task && r) {
+ /* Detect the EOS/EOR bucket and forward any trailers that may have
+ * been set to our h2_headers.
*/
for (b = APR_BRIGADE_FIRST(bb);
b != APR_BRIGADE_SENTINEL(bb);
b = APR_BUCKET_NEXT(b))
{
- if (AP_BUCKET_IS_EOR(b)) {
- /* FIXME: need a better test case than this.
- apr_table_setn(r->trailers_out, "X", "1"); */
- if (r->trailers_out && !apr_is_empty_table(r->trailers_out)) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03049)
- "h2_from_h1(%d): trailers filter, saving trailers",
- from_h1->stream_id);
- h2_response_set_trailers(from_h1->response,
- apr_table_clone(from_h1->pool,
- r->trailers_out));
- }
+ if ((APR_BUCKET_IS_EOS(b) || AP_BUCKET_IS_EOR(b))
+ && r->trailers_out && !apr_is_empty_table(r->trailers_out)) {
+ h2_headers *headers;
+ apr_table_t *trailers;
+
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03049)
+ "h2_task(%s): sending trailers", task->id);
+ trailers = apr_table_clone(r->pool, r->trailers_out);
+ headers = h2_headers_rcreate(r, HTTP_OK, trailers, r->pool);
+ e = h2_bucket_headers_create(bb->bucket_alloc, headers);
+ APR_BUCKET_INSERT_BEFORE(b, e);
+ apr_table_clear(r->trailers_out);
+ ap_remove_output_filter(f);
break;
}
}
diff --git a/modules/http2/h2_from_h1.h b/modules/http2/h2_from_h1.h
index 71cc35faa9..9215539668 100644
--- a/modules/http2/h2_from_h1.h
+++ b/modules/http2/h2_from_h1.h
@@ -30,44 +30,18 @@
* we need to have all handlers and filters involved in request/response
* processing, so this seems to be the way for now.
*/
+struct h2_headers;
+struct h2_task;
-typedef enum {
- H2_RESP_ST_STATUS_LINE, /* parsing http/1 status line */
- H2_RESP_ST_HEADERS, /* parsing http/1 response headers */
- H2_RESP_ST_BODY, /* transferring response body */
- H2_RESP_ST_DONE /* complete response converted */
-} h2_from_h1_state_t;
+apr_status_t h2_headers_output_filter(ap_filter_t *f, apr_bucket_brigade *bb);
-struct h2_response;
+apr_status_t h2_filter_request_in(ap_filter_t* f,
+ apr_bucket_brigade* brigade,
+ ap_input_mode_t mode,
+ apr_read_type_e block,
+ apr_off_t readbytes);
-typedef struct h2_from_h1 h2_from_h1;
-
-struct h2_from_h1 {
- int stream_id;
- h2_from_h1_state_t state;
- apr_pool_t *pool;
- apr_bucket_brigade *bb;
-
- apr_off_t content_length;
- int chunked;
-
- int http_status;
- apr_array_header_t *hlines;
-
- struct h2_response *response;
-};
-
-
-h2_from_h1 *h2_from_h1_create(int stream_id, apr_pool_t *pool);
-
-apr_status_t h2_from_h1_read_response(h2_from_h1 *from_h1,
- ap_filter_t* f, apr_bucket_brigade* bb);
-
-struct h2_response *h2_from_h1_get_response(h2_from_h1 *from_h1);
-
-apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb);
-
-apr_status_t h2_response_trailers_filter(ap_filter_t *f, apr_bucket_brigade *bb);
+apr_status_t h2_filter_trailers_out(ap_filter_t *f, apr_bucket_brigade *bb);
void h2_from_h1_set_basic_http_header(apr_table_t *headers, request_rec *r,
apr_pool_t *pool);
diff --git a/modules/http2/h2_h2.c b/modules/http2/h2_h2.c
index fd5338406b..80e1afadfb 100644
--- a/modules/http2/h2_h2.c
+++ b/modules/http2/h2_h2.c
@@ -32,12 +32,15 @@
#include "mod_http2.h"
#include "h2_private.h"
+#include "h2_bucket_beam.h"
#include "h2_stream.h"
#include "h2_task.h"
#include "h2_config.h"
#include "h2_ctx.h"
#include "h2_conn.h"
+#include "h2_filter.h"
#include "h2_request.h"
+#include "h2_headers.h"
#include "h2_session.h"
#include "h2_util.h"
#include "h2_h2.h"
@@ -569,6 +572,10 @@ void h2_h2_register_hooks(void)
*/
ap_hook_post_read_request(h2_h2_post_read_req, NULL, NULL, APR_HOOK_REALLY_FIRST);
ap_hook_fixups(h2_h2_late_fixups, NULL, NULL, APR_HOOK_LAST);
+
+ /* special bucket type transfer through a h2_bucket_beam */
+ ap_hook_beam_bucket(h2_bucket_observer_beam, NULL, NULL, APR_HOOK_MIDDLE);
+ ap_hook_beam_bucket(h2_bucket_headers_beam, NULL, NULL, APR_HOOK_MIDDLE);
}
int h2_h2_process_conn(conn_rec* c)
@@ -684,30 +691,23 @@ static int h2_h2_post_read_req(request_rec *r)
* that we manipulate filters only once. */
if (task && !task->filters_set) {
ap_filter_t *f;
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r, "adding request filters");
- /* setup the correct output filters to process the response
- * on the proper mod_http2 way. */
- ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r, "adding task output filter");
- if (task->ser_headers) {
- ap_add_output_filter("H1_TO_H2_RESP", task, r, r->connection);
- }
- else {
- /* replace the core http filter that formats response headers
- * in HTTP/1 with our own that collects status and headers */
- ap_remove_output_filter_byhandle(r->output_filters, "HTTP_HEADER");
- ap_add_output_filter("H2_RESPONSE", task, r, r->connection);
- }
+ /* setup the correct filters to process the request for h2 */
+ ap_add_input_filter("H2_REQUEST", task, r, r->connection);
+
+ /* replace the core http filter that formats response headers
+ * in HTTP/1 with our own that collects status and headers */
+ ap_remove_output_filter_byhandle(r->output_filters, "HTTP_HEADER");
+ ap_add_output_filter("H2_RESPONSE", task, r, r->connection);
- /* trailers processing. Incoming trailers are added to this
- * request via our h2 input filter, outgoing trailers
- * in a special h2 out filter. */
for (f = r->input_filters; f; f = f->next) {
- if (!strcmp("H2_TO_H1", f->frec->name)) {
+ if (!strcmp("H2_SLAVE_IN", f->frec->name)) {
f->r = r;
break;
}
}
- ap_add_output_filter("H2_TRAILERS", task, r, r->connection);
+ ap_add_output_filter("H2_TRAILERS_OUT", task, r, r->connection);
task->filters_set = 1;
}
}
diff --git a/modules/http2/h2_headers.c b/modules/http2/h2_headers.c
new file mode 100644
index 0000000000..8add79f507
--- /dev/null
+++ b/modules/http2/h2_headers.c
@@ -0,0 +1,161 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <stdio.h>
+
+#include <apr_strings.h>
+
+#include <httpd.h>
+#include <http_core.h>
+#include <http_log.h>
+#include <util_time.h>
+
+#include <nghttp2/nghttp2.h>
+
+#include "h2_private.h"
+#include "h2_h2.h"
+#include "h2_util.h"
+#include "h2_request.h"
+#include "h2_headers.h"
+
+
+typedef struct {
+ apr_bucket_refcount refcount;
+ h2_headers *headers;
+} h2_bucket_headers;
+
+static apr_status_t bucket_read(apr_bucket *b, const char **str,
+ apr_size_t *len, apr_read_type_e block)
+{
+ (void)b;
+ (void)block;
+ *str = NULL;
+ *len = 0;
+ return APR_SUCCESS;
+}
+
+apr_bucket * h2_bucket_headers_make(apr_bucket *b, h2_headers *r)
+{
+ h2_bucket_headers *br;
+
+ br = apr_bucket_alloc(sizeof(*br), b->list);
+ br->headers = r;
+
+ b = apr_bucket_shared_make(b, br, 0, 0);
+ b->type = &h2_bucket_type_headers;
+
+ return b;
+}
+
+apr_bucket * h2_bucket_headers_create(apr_bucket_alloc_t *list,
+ h2_headers *r)
+{
+ apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
+
+ APR_BUCKET_INIT(b);
+ b->free = apr_bucket_free;
+ b->list = list;
+ b = h2_bucket_headers_make(b, r);
+ return b;
+}
+
+h2_headers *h2_bucket_headers_get(apr_bucket *b)
+{
+ if (H2_BUCKET_IS_HEADERS(b)) {
+ return ((h2_bucket_headers *)b->data)->headers;
+ }
+ return NULL;
+}
+
+const apr_bucket_type_t h2_bucket_type_headers = {
+ "H2HEADERS", 5, APR_BUCKET_METADATA,
+ apr_bucket_destroy_noop,
+ bucket_read,
+ apr_bucket_setaside_noop,
+ apr_bucket_split_notimpl,
+ apr_bucket_shared_copy
+};
+
+apr_bucket *h2_bucket_headers_beam(struct h2_bucket_beam *beam,
+ apr_bucket_brigade *dest,
+ const apr_bucket *src)
+{
+ if (H2_BUCKET_IS_HEADERS(src)) {
+ h2_headers *r = ((h2_bucket_headers *)src->data)->headers;
+ apr_bucket *b = h2_bucket_headers_create(dest->bucket_alloc, r);
+ APR_BRIGADE_INSERT_TAIL(dest, b);
+ return b;
+ }
+ return NULL;
+}
+
+
+h2_headers *h2_headers_create(int status, apr_table_t *headers_in,
+ apr_table_t *notes, apr_pool_t *pool)
+{
+ h2_headers *headers = apr_pcalloc(pool, sizeof(h2_headers));
+ headers->status = status;
+ headers->headers = (headers_in? apr_table_copy(pool, headers_in)
+ : apr_table_make(pool, 5));
+ headers->notes = (notes? apr_table_copy(pool, notes)
+ : apr_table_make(pool, 5));
+ return headers;
+}
+
+h2_headers *h2_headers_rcreate(request_rec *r, int status,
+ apr_table_t *header, apr_pool_t *pool)
+{
+ h2_headers *headers = h2_headers_create(status, header, r->notes, pool);
+ if (headers->status == HTTP_FORBIDDEN) {
+ const char *cause = apr_table_get(r->notes, "ssl-renegotiate-forbidden");
+ if (cause) {
+ /* This request triggered a TLS renegotiation that is now allowed
+ * in HTTP/2. Tell the client that it should use HTTP/1.1 for this.
+ */
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, headers->status, r,
+ APLOGNO(03061)
+ "h2_headers(%ld): renegotiate forbidden, cause: %s",
+ (long)r->connection->id, cause);
+ headers->status = H2_ERR_HTTP_1_1_REQUIRED;
+ }
+ }
+ return headers;
+}
+
+h2_headers *h2_headers_die(apr_status_t type,
+ const h2_request *req, apr_pool_t *pool)
+{
+ h2_headers *headers;
+ char *date;
+
+ headers = apr_pcalloc(pool, sizeof(h2_headers));
+ headers->status = (type >= 200 && type < 600)? type : 500;
+ headers->headers = apr_table_make(pool, 5);
+ headers->notes = apr_table_make(pool, 5);
+
+ date = apr_palloc(pool, APR_RFC822_DATE_LEN);
+ ap_recent_rfc822_date(date, req? req->request_time : apr_time_now());
+ apr_table_setn(headers->headers, "Date", date);
+ apr_table_setn(headers->headers, "Server", ap_get_server_banner());
+
+ return headers;
+}
+
+int h2_headers_are_response(h2_headers *headers)
+{
+ return headers->status >= 200;
+}
+
diff --git a/modules/http2/h2_headers.h b/modules/http2/h2_headers.h
new file mode 100644
index 0000000000..2078cfb705
--- /dev/null
+++ b/modules/http2/h2_headers.h
@@ -0,0 +1,70 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __mod_h2__h2_headers__
+#define __mod_h2__h2_headers__
+
+#include "h2.h"
+
+struct h2_bucket_beam;
+
+extern const apr_bucket_type_t h2_bucket_type_headers;
+
+#define H2_BUCKET_IS_HEADERS(e) (e->type == &h2_bucket_type_headers)
+
+apr_bucket * h2_bucket_headers_make(apr_bucket *b, h2_headers *r);
+
+apr_bucket * h2_bucket_headers_create(apr_bucket_alloc_t *list,
+ h2_headers *r);
+
+h2_headers *h2_bucket_headers_get(apr_bucket *b);
+
+apr_bucket *h2_bucket_headers_beam(struct h2_bucket_beam *beam,
+ apr_bucket_brigade *dest,
+ const apr_bucket *src);
+
+/**
+ * Create the headers from the given status and headers
+ * @param status the headers status
+ * @param header the headers of the headers
+ * @param notes the notes carried by the headers
+ * @param pool the memory pool to use
+ */
+h2_headers *h2_headers_create(int status, apr_table_t *header,
+ apr_table_t *notes, apr_pool_t *pool);
+
+/**
+ * Create the headers from the given request_rec.
+ * @param r the request record which was processed
+ * @param status the headers status
+ * @param header the headers of the headers
+ * @param pool the memory pool to use
+ */
+h2_headers *h2_headers_rcreate(request_rec *r, int status,
+ apr_table_t *header, apr_pool_t *pool);
+
+/**
+ * Create the headers for the given error.
+ * @param stream_id id of the stream to create the headers for
+ * @param type the error code
+ * @param req the original h2_request
+ * @param pool the memory pool to use
+ */
+h2_headers *h2_headers_die(apr_status_t type,
+ const struct h2_request *req, apr_pool_t *pool);
+
+int h2_headers_are_response(h2_headers *headers);
+
+#endif /* defined(__mod_h2__h2_headers__) */
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c
index d5de4fe17f..461c88d64c 100644
--- a/modules/http2/h2_mplx.c
+++ b/modules/http2/h2_mplx.c
@@ -28,13 +28,13 @@
#include "mod_http2.h"
+#include "h2.h"
#include "h2_private.h"
#include "h2_bucket_beam.h"
#include "h2_config.h"
#include "h2_conn.h"
#include "h2_ctx.h"
#include "h2_h2.h"
-#include "h2_response.h"
#include "h2_mplx.h"
#include "h2_ngn_shed.h"
#include "h2_request.h"
@@ -297,7 +297,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->q = h2_iq_create(m->pool, m->max_streams);
m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
- m->sresume = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
m->stream_timeout = stream_timeout;
@@ -444,7 +443,6 @@ static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
*/
h2_iq_remove(m->q, stream->id);
h2_ihash_remove(m->sready, stream->id);
- h2_ihash_remove(m->sresume, stream->id);
h2_ihash_remove(m->streams, stream->id);
if (stream->input) {
m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
@@ -516,12 +514,10 @@ static int task_print(void *ctx, void *val)
h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
- "->03198: h2_stream(%s): %s %s %s -> %s %d"
+ "->03198: h2_stream(%s): %s %s %s"
"[orph=%d/started=%d/done=%d/frozen=%d]",
task->id, task->request->method,
task->request->authority, task->request->path,
- task->response? "http" : (task->rst_error? "reset" : "?"),
- task->response? task->response->http_status : task->rst_error,
(stream? 0 : 1), task->worker_started,
task->worker_done, task->frozen);
}
@@ -545,7 +541,7 @@ static int task_abort_connection(void *ctx, void *val)
if (task->input.beam) {
h2_beam_abort(task->input.beam);
}
- if (task->worker_started && !task->worker_done && task->output.beam) {
+ if (task->output.beam) {
h2_beam_abort(task->output.beam);
}
return 1;
@@ -556,9 +552,9 @@ static int report_stream_iter(void *ctx, void *val) {
h2_stream *stream = val;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, "
- "submitted=%d, suspended=%d",
+ "ready=%d",
m->id, stream->id, stream->started, stream->scheduled,
- stream->submitted, stream->suspended);
+ h2_stream_is_ready(stream));
return 1;
}
@@ -575,9 +571,8 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): release_join with %d streams open, "
- "%d streams resume, %d streams ready, %d tasks",
+ "%d streams ready, %d tasks",
m->id, (int)h2_ihash_count(m->streams),
- (int)h2_ihash_count(m->sresume),
(int)h2_ihash_count(m->sready),
(int)h2_ihash_count(m->tasks));
h2_ihash_iter(m->streams, report_stream_iter, m);
@@ -707,6 +702,19 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream)
return status;
}
+h2_stream *h2_mplx_stream_get(h2_mplx *m, apr_uint32_t id)
+{
+ h2_stream *s = NULL;
+ int acquired;
+
+ AP_DEBUG_ASSERT(m);
+ if ((enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ s = h2_ihash_get(m->streams, id);
+ leave_mutex(m, acquired);
+ }
+ return s;
+}
+
void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
{
m->input_consumed = cb;
@@ -730,31 +738,26 @@ static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
}
}
-static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
+static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
{
apr_status_t status = APR_SUCCESS;
h2_task *task = h2_ihash_get(m->tasks, stream_id);
h2_stream *stream = h2_ihash_get(m->streams, stream_id);
+ apr_size_t beamed_count;
if (!task || !stream) {
return APR_ECONNABORTED;
}
- status = h2_task_add_response(task, response);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
- "h2_mplx(%s): add response: %d, rst=%d",
- task->id, response->http_status, response->rst_error);
- if (status != APR_SUCCESS) {
- return status;
- }
-
- if (task->output.beam && !task->output.opened) {
- apr_uint32_t beamed_count;
- h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem);
- h2_beam_timeout_set(task->output.beam, m->stream_timeout);
- h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
- h2_beam_on_produced(task->output.beam, output_produced, m);
- beamed_count = h2_beam_get_files_beamed(task->output.beam);
+ "h2_mplx(%s): out open", task->id);
+
+ if (!stream->output) {
+ h2_beam_buffer_size_set(beam, m->stream_max_mem);
+ h2_beam_timeout_set(beam, m->stream_timeout);
+ h2_beam_on_consumed(beam, stream_output_consumed, task);
+ h2_beam_on_produced(beam, output_produced, m);
+ beamed_count = h2_beam_get_files_beamed(beam);
if (m->tx_handles_reserved >= beamed_count) {
m->tx_handles_reserved -= beamed_count;
}
@@ -762,22 +765,20 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
m->tx_handles_reserved = 0;
}
if (!task->output.copy_files) {
- h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
+ h2_beam_on_file_beam(beam, can_beam_file, m);
}
- h2_beam_mutex_set(task->output.beam, beam_enter, task->cond, m);
- task->output.opened = 1;
+ h2_beam_mutex_set(beam, beam_enter, task->cond, m);
+ stream->output = beam;
}
- if (response && response->http_status < 300) {
- /* we might see some file buckets in the output, see
- * if we have enough handles reserved. */
- check_tx_reservation(m);
- }
- have_out_data_for(m, stream, 1);
+ /* we might see some file buckets in the output, see
+ * if we have enough handles reserved. */
+ check_tx_reservation(m);
+ have_out_data_for(m, stream, 0);
return status;
}
-apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response)
+apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
{
apr_status_t status;
int acquired;
@@ -788,7 +789,7 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response)
status = APR_ECONNABORTED;
}
else {
- status = out_open(m, stream_id, response);
+ status = out_open(m, stream_id, beam);
}
leave_mutex(m, acquired);
}
@@ -809,16 +810,6 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task)
return APR_ECONNABORTED;
}
- if (!task->response && !task->rst_error) {
- /* In case a close comes before a response was created,
- * insert an error one so that our streams can properly reset.
- */
- h2_response *r = h2_response_die(task->stream_id, 500,
- task->request, m->pool);
- status = out_open(m, task->stream_id, r);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, APLOGNO(03393)
- "h2_mplx(%s): close, no response, no rst", task->id);
- }
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
"h2_mplx(%s): close", task->id);
if (task->output.beam) {
@@ -842,7 +833,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
if (m->aborted) {
status = APR_ECONNABORTED;
}
- else if (!h2_ihash_empty(m->sready) || !h2_ihash_empty(m->sresume)) {
+ else if (!h2_ihash_empty(m->sready)) {
status = APR_SUCCESS;
}
else {
@@ -863,13 +854,10 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response)
{
- h2_ihash_t *set;
ap_assert(m);
ap_assert(stream);
-
- set = response? m->sready : m->sresume;
- if (!h2_ihash_get(set, stream->id)) {
- h2_ihash_add(set, stream);
+ if (!h2_ihash_get(m->sready, stream->id)) {
+ h2_ihash_add(m->sready, stream);
if (m->added_output) {
apr_thread_cond_signal(m->added_output);
}
@@ -910,25 +898,20 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
}
else {
h2_ihash_add(m->streams, stream);
- if (stream->response) {
- /* already have a respone, schedule for submit */
+ if (h2_stream_is_ready(stream)) {
h2_ihash_add(m->sready, stream);
}
else {
- h2_beam_create(&stream->input, stream->pool, stream->id,
- "input", 0);
if (!m->need_registration) {
m->need_registration = h2_iq_empty(m->q);
}
if (m->workers_busy < m->workers_max) {
do_registration = m->need_registration;
}
- h2_iq_add(m->q, stream->id, cmp, ctx);
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
- "h2_mplx(%ld-%d): process, body=%d",
- m->c->id, stream->id, stream->request->body);
+ h2_iq_add(m->q, stream->id, cmp, ctx);
}
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+ "h2_mplx(%ld-%d): process", m->c->id, stream->id);
}
leave_mutex(m, acquired);
}
@@ -939,7 +922,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
return status;
}
-static h2_task *pop_task(h2_mplx *m)
+static h2_task *next_stream_task(h2_mplx *m)
{
h2_task *task = NULL;
h2_stream *stream;
@@ -957,13 +940,13 @@ static h2_task *pop_task(h2_mplx *m)
slave = *pslave;
}
else {
- slave = h2_slave_create(m->c, m->pool, NULL);
+ slave = h2_slave_create(m->c, stream->id, m->pool, NULL);
new_conn = 1;
}
slave->sbh = m->c->sbh;
slave->aborted = 0;
- task = h2_task_create(slave, stream->request, stream->input, m);
+ task = h2_task_create(slave, stream->id, stream->request, stream->input, m);
h2_ihash_add(m->tasks, task);
m->c->keepalives++;
@@ -1003,7 +986,7 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
*has_more = 0;
}
else {
- task = pop_task(m);
+ task = next_stream_task(m);
*has_more = !h2_iq_empty(m->q);
}
@@ -1022,9 +1005,6 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
* and the original worker has finished. That means the
* engine may start processing now. */
h2_task_thaw(task);
- /* we do not want the task to block on writing response
- * bodies into the mplx. */
- h2_task_set_io_blocking(task, 0);
apr_thread_cond_broadcast(m->task_thawed);
return;
}
@@ -1141,7 +1121,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
--m->workers_busy;
if (ptask) {
/* caller wants another task */
- *ptask = pop_task(m);
+ *ptask = next_stream_task(m);
}
leave_mutex(m, acquired);
}
@@ -1154,15 +1134,19 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
static int latest_repeatable_unsubmitted_iter(void *data, void *val)
{
task_iter_ctx *ctx = data;
+ h2_stream *stream;
h2_task *task = val;
if (!task->worker_done && h2_task_can_redo(task)
&& !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) {
- /* this task occupies a worker, the response has not been submitted yet,
- * not been cancelled and it is a repeatable request
- * -> it can be re-scheduled later */
- if (!ctx->task || ctx->task->started_at < task->started_at) {
- /* we did not have one or this one was started later */
- ctx->task = task;
+ stream = h2_ihash_get(ctx->m->streams, task->stream_id);
+ if (stream && !h2_stream_is_ready(stream)) {
+ /* this task occupies a worker, the response has not been submitted
+ * yet, not been cancelled and it is a repeatable request
+ * -> it can be re-scheduled later */
+ if (!ctx->task || ctx->task->started_at < task->started_at) {
+ /* we did not have one or this one was started later */
+ ctx->task = task;
+ }
}
}
return 1;
@@ -1329,13 +1313,12 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
return APR_ECONNABORTED;
}
m = task->mplx;
- task->r = r;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
if (stream) {
- status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
+ status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
}
else {
status = APR_ECONNABORTED;
@@ -1353,7 +1336,6 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
h2_mplx *m = h2_ngn_shed_get_ctx(shed);
apr_status_t status;
- h2_task *task = NULL;
int acquired;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
@@ -1368,22 +1350,21 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
* had and, if not, wait a short while before doing the
* blocking, and if unsuccessful, terminating read.
*/
- status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
+ status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
if (APR_STATUS_IS_EAGAIN(status)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): start block engine pull", m->id);
apr_thread_cond_timedwait(m->task_thawed, m->lock,
apr_time_from_msec(20));
- status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
+ status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
}
}
else {
- status = h2_ngn_shed_pull_task(shed, ngn, capacity,
- want_shutdown, &task);
+ status = h2_ngn_shed_pull_request(shed, ngn, capacity,
+ want_shutdown, pr);
}
leave_mutex(m, acquired);
}
- *pr = task? task->r : NULL;
return status;
}
@@ -1423,14 +1404,12 @@ static int update_window(void *ctx, void *val)
apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
stream_ev_callback *on_resume,
- stream_ev_callback *on_response,
void *on_ctx)
{
apr_status_t status;
int acquired;
int streams[32];
h2_stream *stream;
- h2_task *task;
size_t i, n;
AP_DEBUG_ASSERT(m);
@@ -1440,8 +1419,7 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
/* update input windows for streams */
h2_ihash_iter(m->streams, update_window, m);
-
- if (on_response && !h2_ihash_empty(m->sready)) {
+ if (on_resume && !h2_ihash_empty(m->sready)) {
n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams));
for (i = 0; i < n; ++i) {
stream = h2_ihash_get(m->streams, streams[i]);
@@ -1449,49 +1427,9 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
continue;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
- "h2_mplx(%ld-%d): on_response",
- m->id, stream->id);
- task = h2_ihash_get(m->tasks, stream->id);
- if (task) {
- task->response_sent = 1;
- if (task->rst_error) {
- h2_stream_rst(stream, task->rst_error);
- }
- else {
- AP_DEBUG_ASSERT(task->response);
- status = h2_stream_add_response(stream, task->response,
- task->output.beam);
- if (status != APR_SUCCESS) {
- h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
- }
- if (!h2_response_get_final(task->response)) {
- /* the final response needs still to arrive */
- task->response = NULL;
- }
- }
- }
- else {
- /* We have the stream ready without a task. This happens
- * when we fail streams early. A response should already
- * be present. */
- AP_DEBUG_ASSERT(stream->response || stream->rst_error);
- }
- status = on_response(on_ctx, stream->id);
- }
- }
-
- if (on_resume && !h2_ihash_empty(m->sresume)) {
- n = h2_ihash_ishift(m->sresume, streams, H2_ALEN(streams));
- for (i = 0; i < n; ++i) {
- stream = h2_ihash_get(m->streams, streams[i]);
- if (!stream) {
- continue;
- }
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
"h2_mplx(%ld-%d): on_resume",
m->id, stream->id);
- h2_stream_set_suspended(stream, 0);
- status = on_resume(on_ctx, stream->id);
+ on_resume(on_ctx, stream->id);
}
}
@@ -1500,25 +1438,36 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
return status;
}
-apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
+apr_status_t h2_mplx_keep_active(h2_mplx *m, apr_uint32_t stream_id)
{
apr_status_t status;
- h2_stream *stream;
- h2_task *task;
int acquired;
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- stream = h2_ihash_get(m->streams, stream_id);
- if (stream && !h2_ihash_get(m->sresume, stream->id)) {
- /* not marked for resume again already */
- h2_stream_set_suspended(stream, 1);
- task = h2_ihash_get(m->tasks, stream->id);
- if (stream->started && (!task || task->worker_done)) {
- h2_ihash_add(m->sresume, stream);
- }
+ h2_stream *s = h2_ihash_get(m->streams, stream_id);
+ if (s) {
+ h2_ihash_add(m->sready, s);
}
leave_mutex(m, acquired);
}
return status;
}
+
+int h2_mplx_awaits_data(h2_mplx *m)
+{
+ apr_status_t status;
+ int acquired, waiting = 1;
+
+ AP_DEBUG_ASSERT(m);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ if (h2_ihash_empty(m->streams)) {
+ waiting = 0;
+ }
+ if (h2_iq_empty(m->q) && h2_ihash_empty(m->tasks)) {
+ waiting = 0;
+ }
+ leave_mutex(m, acquired);
+ }
+ return waiting;
+}
diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h
index 229518cb21..308facd895 100644
--- a/modules/http2/h2_mplx.h
+++ b/modules/http2/h2_mplx.h
@@ -40,7 +40,6 @@ struct apr_thread_cond_t;
struct h2_bucket_beam;
struct h2_config;
struct h2_ihash_t;
-struct h2_response;
struct h2_task;
struct h2_stream;
struct h2_request;
@@ -76,9 +75,8 @@ struct h2_mplx {
struct h2_ihash_t *spurge; /* all streams done, ready for destroy */
struct h2_iqueue *q; /* all stream ids that need to be started */
- struct h2_ihash_t *sready; /* all streams ready for response */
- struct h2_ihash_t *sresume; /* all streams that can be resumed */
-
+ struct h2_ihash_t *sready; /* all streams ready for output */
+
struct h2_ihash_t *tasks; /* all tasks started and not destroyed */
struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */
@@ -164,6 +162,8 @@ int h2_mplx_is_busy(h2_mplx *m);
* IO lifetime of streams.
******************************************************************************/
+struct h2_stream *h2_mplx_stream_get(h2_mplx *m, apr_uint32_t id);
+
/**
* Notifies mplx that a stream has finished processing.
*
@@ -181,6 +181,8 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, struct h2_stream *stream);
apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
struct apr_thread_cond_t *iowait);
+apr_status_t h2_mplx_keep_active(h2_mplx *m, apr_uint32_t stream_id);
+
/*******************************************************************************
* Stream processing.
******************************************************************************/
@@ -222,16 +224,15 @@ typedef apr_status_t stream_ev_callback(void *ctx, int stream_id);
/**
* Dispatch events for the master connection, such as
- * - resume: new output data has arrived for a suspended stream
- * - response: the response for a stream is ready
+ ± @param m the multiplexer
+ * @param on_resume new output data has arrived for a suspended stream
+ * @param ctx user supplied argument to invocation.
*/
apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
stream_ev_callback *on_resume,
- stream_ev_callback *on_response,
void *ctx);
-apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id);
-
+int h2_mplx_awaits_data(h2_mplx *m);
typedef int h2_mplx_stream_cb(struct h2_stream *s, void *ctx);
@@ -245,7 +246,7 @@ apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx);
* Opens the output for the given stream with the specified response.
*/
apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id,
- struct h2_response *response);
+ struct h2_bucket_beam *beam);
/*******************************************************************************
* h2_mplx list Manipulation.
diff --git a/modules/http2/h2_ngn_shed.c b/modules/http2/h2_ngn_shed.c
index 14e57a7aa3..6be6d24d71 100644
--- a/modules/http2/h2_ngn_shed.c
+++ b/modules/http2/h2_ngn_shed.c
@@ -35,7 +35,6 @@
#include "h2_ctx.h"
#include "h2_h2.h"
#include "h2_mplx.h"
-#include "h2_response.h"
#include "h2_request.h"
#include "h2_task.h"
#include "h2_util.h"
@@ -46,6 +45,7 @@ typedef struct h2_ngn_entry h2_ngn_entry;
struct h2_ngn_entry {
APR_RING_ENTRY(h2_ngn_entry) link;
h2_task *task;
+ request_rec *r;
};
#define H2_NGN_ENTRY_NEXT(e) APR_RING_NEXT((e), link)
@@ -144,26 +144,28 @@ void h2_ngn_shed_abort(h2_ngn_shed *shed)
shed->aborted = 1;
}
-static void ngn_add_task(h2_req_engine *ngn, h2_task *task)
+static void ngn_add_task(h2_req_engine *ngn, h2_task *task, request_rec *r)
{
h2_ngn_entry *entry = apr_pcalloc(task->pool, sizeof(*entry));
APR_RING_ELEM_INIT(entry, link);
entry->task = task;
+ entry->r = r;
H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
}
-apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type,
- h2_task *task, http2_req_engine_init *einit)
+apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type,
+ request_rec *r,
+ http2_req_engine_init *einit)
{
h2_req_engine *ngn;
+ h2_task *task = h2_ctx_rget_task(r);
- AP_DEBUG_ASSERT(shed);
-
+ ap_assert(task);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
"h2_ngn_shed(%ld): PUSHing request (task=%s)", shed->c->id,
task->id);
- if (task->ser_headers) {
+ if (task->request->serialize) {
/* Max compatibility, deny processing of this */
return APR_EOF;
}
@@ -184,7 +186,7 @@ apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type,
if (!h2_task_is_detached(task)) {
h2_task_freeze(task);
}
- ngn_add_task(ngn, task);
+ ngn_add_task(ngn, task, r);
ngn->no_assigned++;
return APR_SUCCESS;
}
@@ -207,7 +209,7 @@ apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type,
APR_RING_INIT(&newngn->entries, h2_ngn_entry, link);
status = einit(newngn, newngn->id, newngn->type, newngn->pool,
- shed->req_buffer_size, task->r,
+ shed->req_buffer_size, r,
&newngn->out_consumed, &newngn->out_consumed_ctx);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03395)
"h2_ngn_shed(%ld): create engine %s (%s)",
@@ -242,16 +244,16 @@ static h2_ngn_entry *pop_detached(h2_req_engine *ngn)
return NULL;
}
-apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed,
- h2_req_engine *ngn,
- apr_uint32_t capacity,
- int want_shutdown,
- h2_task **ptask)
+apr_status_t h2_ngn_shed_pull_request(h2_ngn_shed *shed,
+ h2_req_engine *ngn,
+ apr_uint32_t capacity,
+ int want_shutdown,
+ request_rec **pr)
{
h2_ngn_entry *entry;
AP_DEBUG_ASSERT(ngn);
- *ptask = NULL;
+ *pr = NULL;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, APLOGNO(03396)
"h2_ngn_shed(%ld): pull task for engine %s, shutdown=%d",
shed->c->id, ngn->id, want_shutdown);
@@ -279,7 +281,7 @@ apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed,
"h2_ngn_shed(%ld): pulled request %s for engine %s",
shed->c->id, entry->task->id, ngn->id);
ngn->no_live++;
- *ptask = entry->task;
+ *pr = entry->r;
entry->task->assigned = ngn;
/* task will now run in ngn's own thread. Modules like lua
* seem to require the correct thread set in the conn_rec.
diff --git a/modules/http2/h2_ngn_shed.h b/modules/http2/h2_ngn_shed.h
index 832dbd3a8e..1f61466a5c 100644
--- a/modules/http2/h2_ngn_shed.h
+++ b/modules/http2/h2_ngn_shed.h
@@ -58,13 +58,13 @@ h2_ngn_shed *h2_ngn_shed_get_shed(struct h2_req_engine *ngn);
void h2_ngn_shed_abort(h2_ngn_shed *shed);
-apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type,
- struct h2_task *task,
- h2_shed_ngn_init *init_cb);
+apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type,
+ request_rec *r,
+ h2_shed_ngn_init *init_cb);
-apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, h2_req_engine *pub_ngn,
- apr_uint32_t capacity,
- int want_shutdown, struct h2_task **ptask);
+apr_status_t h2_ngn_shed_pull_request(h2_ngn_shed *shed, h2_req_engine *pub_ngn,
+ apr_uint32_t capacity,
+ int want_shutdown, request_rec **pr);
apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed,
struct h2_req_engine *ngn,
diff --git a/modules/http2/h2_proxy_util.c b/modules/http2/h2_proxy_util.c
index 040671172c..39e139d5c8 100644
--- a/modules/http2/h2_proxy_util.c
+++ b/modules/http2/h2_proxy_util.c
@@ -566,7 +566,6 @@ static h2_request *h2_req_createn(int id, apr_pool_t *pool, const char *method,
{
h2_request *req = apr_pcalloc(pool, sizeof(h2_request));
- req->id = id;
req->method = method;
req->scheme = scheme;
req->authority = authority;
diff --git a/modules/http2/h2_push.c b/modules/http2/h2_push.c
index df5632af1e..042042704e 100644
--- a/modules/http2/h2_push.c
+++ b/modules/http2/h2_push.c
@@ -34,7 +34,7 @@
#include "h2_util.h"
#include "h2_push.h"
#include "h2_request.h"
-#include "h2_response.h"
+#include "h2_headers.h"
#include "h2_session.h"
#include "h2_stream.h"
@@ -58,6 +58,7 @@ static const char *policy_str(h2_push_policy policy)
typedef struct {
const h2_request *req;
+ int push_policy;
apr_pool_t *pool;
apr_array_header_t *pushes;
const char *s;
@@ -336,7 +337,7 @@ static int add_push(link_ctx *ctx)
*/
path = apr_uri_unparse(ctx->pool, &uri, APR_URI_UNP_OMITSITEPART);
push = apr_pcalloc(ctx->pool, sizeof(*push));
- switch (ctx->req->push_policy) {
+ switch (ctx->push_policy) {
case H2_PUSH_HEAD:
method = "HEAD";
break;
@@ -350,7 +351,7 @@ static int add_push(link_ctx *ctx)
ctx->req->authority, path, headers,
ctx->req->serialize);
/* atm, we do not push on pushes */
- h2_request_end_headers(req, ctx->pool, 1, 0);
+ h2_request_end_headers(req, ctx->pool, 1);
push->req = req;
if (!ctx->pushes) {
@@ -427,10 +428,10 @@ static int head_iter(void *ctx, const char *key, const char *value)
return 1;
}
-apr_array_header_t *h2_push_collect(apr_pool_t *p, const h2_request *req,
- const h2_response *res)
+apr_array_header_t *h2_push_collect(apr_pool_t *p, const h2_request *req,
+ int push_policy, const h2_headers *res)
{
- if (req && req->push_policy != H2_PUSH_NONE) {
+ if (req && push_policy != H2_PUSH_NONE) {
/* Collect push candidates from the request/response pair.
*
* One source for pushes are "rel=preload" link headers
@@ -444,11 +445,13 @@ apr_array_header_t *h2_push_collect(apr_pool_t *p, const h2_request *req,
memset(&ctx, 0, sizeof(ctx));
ctx.req = req;
+ ctx.push_policy = push_policy;
ctx.pool = p;
apr_table_do(head_iter, &ctx, res->headers, NULL);
if (ctx.pushes) {
- apr_table_setn(res->headers, "push-policy", policy_str(req->push_policy));
+ apr_table_setn(res->headers, "push-policy",
+ policy_str(push_policy));
}
return ctx.pushes;
}
@@ -681,7 +684,7 @@ apr_array_header_t *h2_push_diary_update(h2_session *session, apr_array_header_t
apr_array_header_t *h2_push_collect_update(h2_stream *stream,
const struct h2_request *req,
- const struct h2_response *res)
+ const struct h2_headers *res)
{
h2_session *session = stream->session;
const char *cache_digest = apr_table_get(req->headers, "Cache-Digest");
@@ -698,7 +701,7 @@ apr_array_header_t *h2_push_collect_update(h2_stream *stream,
session->id, cache_digest);
}
}
- pushes = h2_push_collect(stream->pool, req, res);
+ pushes = h2_push_collect(stream->pool, req, stream->push_policy, res);
return h2_push_diary_update(stream->session, pushes);
}
diff --git a/modules/http2/h2_push.h b/modules/http2/h2_push.h
index ae1ff6281a..bfe204f5a4 100644
--- a/modules/http2/h2_push.h
+++ b/modules/http2/h2_push.h
@@ -18,7 +18,7 @@
#include "h2.h"
struct h2_request;
-struct h2_response;
+struct h2_headers;
struct h2_ngheader;
struct h2_session;
struct h2_stream;
@@ -58,7 +58,8 @@ struct h2_push_diary {
*/
apr_array_header_t *h2_push_collect(apr_pool_t *p,
const struct h2_request *req,
- const struct h2_response *res);
+ int push_policy,
+ const struct h2_headers *res);
/**
* Create a new push diary for the given maximum number of entries.
@@ -81,7 +82,7 @@ apr_array_header_t *h2_push_diary_update(struct h2_session *session, apr_array_h
*/
apr_array_header_t *h2_push_collect_update(struct h2_stream *stream,
const struct h2_request *req,
- const struct h2_response *res);
+ const struct h2_headers *res);
/**
* Get a cache digest as described in
* https://datatracker.ietf.org/doc/draft-kazuho-h2-cache-digest/
diff --git a/modules/http2/h2_request.c b/modules/http2/h2_request.c
index 63dceb9d55..d2d7df472c 100644
--- a/modules/http2/h2_request.c
+++ b/modules/http2/h2_request.c
@@ -36,13 +36,6 @@
#include "h2_util.h"
-static apr_status_t inspect_clen(h2_request *req, const char *s)
-{
- char *end;
- req->content_length = apr_strtoi64(s, &end, 10);
- return (s == end)? APR_EINVAL : APR_SUCCESS;
-}
-
typedef struct {
apr_table_t *headers;
apr_pool_t *pool;
@@ -59,7 +52,6 @@ static int set_h1_header(void *ctx, const char *key, const char *value)
}
apr_status_t h2_request_rcreate(h2_request **preq, apr_pool_t *pool,
- int stream_id, int initiated_on,
request_rec *r)
{
h2_request *req;
@@ -86,8 +78,6 @@ apr_status_t h2_request_rcreate(h2_request **preq, apr_pool_t *pool,
}
req = apr_pcalloc(pool, sizeof(*req));
- req->id = stream_id;
- req->initiated_on = initiated_on;
req->method = apr_pstrdup(pool, r->method);
req->scheme = scheme;
req->authority = authority;
@@ -121,8 +111,7 @@ apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool,
if (!apr_is_empty_table(req->headers)) {
ap_log_perror(APLOG_MARK, APLOG_ERR, 0, pool,
APLOGNO(02917)
- "h2_request(%d): pseudo header after request start",
- req->id);
+ "h2_request: pseudo header after request start");
return APR_EGENERAL;
}
@@ -148,8 +137,8 @@ apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool,
strncpy(buffer, name, (nlen > 31)? 31 : nlen);
ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, pool,
APLOGNO(02954)
- "h2_request(%d): ignoring unknown pseudo header %s",
- req->id, buffer);
+ "h2_request: ignoring unknown pseudo header %s",
+ buffer);
}
}
else {
@@ -160,8 +149,7 @@ apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool,
return status;
}
-apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool,
- int eos, int push)
+apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, int eos)
{
const char *s;
@@ -181,21 +169,9 @@ apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool,
}
s = apr_table_get(req->headers, "Content-Length");
- if (s) {
- if (inspect_clen(req, s) != APR_SUCCESS) {
- ap_log_perror(APLOG_MARK, APLOG_WARNING, APR_EINVAL, pool,
- APLOGNO(02959)
- "h2_request(%d): content-length value not parsed: %s",
- req->id, s);
- return APR_EINVAL;
- }
- req->body = 1;
- }
- else {
+ if (!s) {
/* no content-length given */
- req->content_length = -1;
- req->body = !eos;
- if (req->body) {
+ if (!eos) {
/* We have not seen a content-length and have no eos,
* simulate a chunked encoding for our HTTP/1.1 infrastructure,
* in case we have "H2SerializeHeaders on" here
@@ -204,67 +180,16 @@ apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool,
apr_table_mergen(req->headers, "Transfer-Encoding", "chunked");
}
else if (apr_table_get(req->headers, "Content-Type")) {
- /* If we have a content-type, but already see eos, no more
+ /* If we have a content-type, but already seen eos, no more
* data will come. Signal a zero content length explicitly.
*/
apr_table_setn(req->headers, "Content-Length", "0");
}
}
- h2_push_policy_determine(req, pool, push);
-
- /* In the presence of trailers, force behaviour of chunked encoding */
- s = apr_table_get(req->headers, "Trailer");
- if (s && s[0]) {
- req->trailers = apr_table_make(pool, 5);
- if (!req->chunked) {
- req->chunked = 1;
- apr_table_mergen(req->headers, "Transfer-Encoding", "chunked");
- }
- }
-
- return APR_SUCCESS;
-}
-
-static apr_status_t add_h1_trailer(h2_request *req, apr_pool_t *pool,
- const char *name, size_t nlen,
- const char *value, size_t vlen)
-{
- char *hname, *hvalue;
-
- if (h2_req_ignore_trailer(name, nlen)) {
- return APR_SUCCESS;
- }
-
- hname = apr_pstrndup(pool, name, nlen);
- hvalue = apr_pstrndup(pool, value, vlen);
- h2_util_camel_case_header(hname, nlen);
-
- apr_table_mergen(req->trailers, hname, hvalue);
-
return APR_SUCCESS;
}
-
-apr_status_t h2_request_add_trailer(h2_request *req, apr_pool_t *pool,
- const char *name, size_t nlen,
- const char *value, size_t vlen)
-{
- if (!req->trailers) {
- ap_log_perror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, pool, APLOGNO(03059)
- "h2_request(%d): unanounced trailers",
- req->id);
- return APR_EINVAL;
- }
- if (nlen == 0 || name[0] == ':') {
- ap_log_perror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, pool, APLOGNO(03060)
- "h2_request(%d): pseudo header in trailer",
- req->id);
- return APR_EINVAL;
- }
- return add_h1_trailer(req, pool, name, nlen, value, vlen);
-}
-
h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src)
{
h2_request *dst = apr_pmemdup(p, src, sizeof(*dst));
@@ -273,9 +198,6 @@ h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src)
dst->authority = apr_pstrdup(p, src->authority);
dst->path = apr_pstrdup(p, src->path);
dst->headers = apr_table_clone(p, src->headers);
- if (src->trailers) {
- dst->trailers = apr_table_clone(p, src->trailers);
- }
return dst;
}
@@ -346,8 +268,8 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *c)
* request for a vhost where h2 is disabled --> 421.
*/
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03367)
- "h2_request(%d): access_status=%d, request_create failed",
- req->id, access_status);
+ "h2_request: access_status=%d, request_create failed",
+ access_status);
ap_die(access_status, r);
ap_update_child_status(c->sbh, SERVER_BUSY_LOG, r);
ap_run_log_transaction(r);
diff --git a/modules/http2/h2_request.h b/modules/http2/h2_request.h
index 4a3f3ca285..faf9791194 100644
--- a/modules/http2/h2_request.h
+++ b/modules/http2/h2_request.h
@@ -19,7 +19,6 @@
#include "h2.h"
apr_status_t h2_request_rcreate(h2_request **preq, apr_pool_t *pool,
- int stream_id, int initiated_on,
request_rec *r);
apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool,
@@ -30,8 +29,7 @@ apr_status_t h2_request_add_trailer(h2_request *req, apr_pool_t *pool,
const char *name, size_t nlen,
const char *value, size_t vlen);
-apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool,
- int eos, int push);
+apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, int eos);
h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src);
diff --git a/modules/http2/h2_response.c b/modules/http2/h2_response.c
deleted file mode 100644
index 85599dbcf5..0000000000
--- a/modules/http2/h2_response.c
+++ /dev/null
@@ -1,219 +0,0 @@
-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <assert.h>
-#include <stdio.h>
-
-#include <apr_strings.h>
-
-#include <httpd.h>
-#include <http_core.h>
-#include <http_log.h>
-#include <util_time.h>
-
-#include <nghttp2/nghttp2.h>
-
-#include "h2_private.h"
-#include "h2_filter.h"
-#include "h2_h2.h"
-#include "h2_util.h"
-#include "h2_request.h"
-#include "h2_response.h"
-
-
-static apr_table_t *parse_headers(apr_array_header_t *hlines, apr_pool_t *pool)
-{
- if (hlines) {
- apr_table_t *headers = apr_table_make(pool, hlines->nelts);
- int i;
-
- for (i = 0; i < hlines->nelts; ++i) {
- char *hline = ((char **)hlines->elts)[i];
- char *sep = ap_strchr(hline, ':');
- if (!sep) {
- ap_log_perror(APLOG_MARK, APLOG_WARNING, APR_EINVAL, pool,
- APLOGNO(02955) "h2_response: invalid header[%d] '%s'",
- i, (char*)hline);
- /* not valid format, abort */
- return NULL;
- }
- (*sep++) = '\0';
- while (*sep == ' ' || *sep == '\t') {
- ++sep;
- }
-
- if (!h2_util_ignore_header(hline)) {
- apr_table_merge(headers, hline, sep);
- }
- }
- return headers;
- }
- else {
- return apr_table_make(pool, 0);
- }
-}
-
-static const char *get_sos_filter(apr_table_t *notes)
-{
- return notes? apr_table_get(notes, H2_RESP_SOS_NOTE) : NULL;
-}
-
-static void check_clen(h2_response *response, request_rec *r, apr_pool_t *pool)
-{
-
- if (r && r->header_only) {
- response->content_length = 0;
- }
- else if (response->headers) {
- const char *s = apr_table_get(response->headers, "Content-Length");
- if (s) {
- char *end;
- response->content_length = apr_strtoi64(s, &end, 10);
- if (s == end) {
- ap_log_perror(APLOG_MARK, APLOG_WARNING, APR_EINVAL,
- pool, APLOGNO(02956)
- "h2_response: content-length"
- " value not parsed: %s", s);
- response->content_length = -1;
- }
- }
- }
-}
-
-static h2_response *h2_response_create_int(int stream_id,
- int rst_error,
- int http_status,
- apr_table_t *headers,
- apr_table_t *notes,
- apr_pool_t *pool)
-{
- h2_response *response;
-
- if (!headers) {
- return NULL;
- }
-
- response = apr_pcalloc(pool, sizeof(h2_response));
- if (response == NULL) {
- return NULL;
- }
-
- response->stream_id = stream_id;
- response->rst_error = rst_error;
- response->http_status = http_status? http_status : 500;
- response->content_length = -1;
- response->headers = headers;
- response->sos_filter = get_sos_filter(notes);
-
- check_clen(response, NULL, pool);
- return response;
-}
-
-
-h2_response *h2_response_create(int stream_id,
- int rst_error,
- int http_status,
- apr_array_header_t *hlines,
- apr_table_t *notes,
- apr_pool_t *pool)
-{
- return h2_response_create_int(stream_id, rst_error, http_status,
- parse_headers(hlines, pool), notes, pool);
-}
-
-h2_response *h2_response_rcreate(int stream_id, request_rec *r, int status,
- apr_table_t *header, apr_pool_t *pool)
-{
- h2_response *response = apr_pcalloc(pool, sizeof(h2_response));
- if (response == NULL) {
- return NULL;
- }
-
- response->stream_id = stream_id;
- response->http_status = status;
- response->content_length = -1;
- response->headers = header? header : apr_table_make(pool, 5);
- response->sos_filter = get_sos_filter(r->notes);
-
- check_clen(response, r, pool);
-
- if (response->http_status == HTTP_FORBIDDEN) {
- const char *cause = apr_table_get(r->notes, "ssl-renegotiate-forbidden");
- if (cause) {
- /* This request triggered a TLS renegotiation that is now allowed
- * in HTTP/2. Tell the client that it should use HTTP/1.1 for this.
- */
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, response->http_status, r,
- APLOGNO(03061)
- "h2_response(%ld-%d): renegotiate forbidden, cause: %s",
- (long)r->connection->id, stream_id, cause);
- response->rst_error = H2_ERR_HTTP_1_1_REQUIRED;
- }
- }
-
- return response;
-}
-
-h2_response *h2_response_die(int stream_id, apr_status_t type,
- const struct h2_request *req, apr_pool_t *pool)
-{
- apr_table_t *headers = apr_table_make(pool, 5);
- char *date = NULL;
- int status = (type >= 200 && type < 600)? type : 500;
-
- date = apr_palloc(pool, APR_RFC822_DATE_LEN);
- ap_recent_rfc822_date(date, req? req->request_time : apr_time_now());
- apr_table_setn(headers, "Date", date);
- apr_table_setn(headers, "Server", ap_get_server_banner());
-
- return h2_response_create_int(stream_id, 0, status, headers, NULL, pool);
-}
-
-h2_response *h2_response_clone(apr_pool_t *pool, h2_response *from)
-{
- h2_response *to = apr_pcalloc(pool, sizeof(h2_response));
-
- to->stream_id = from->stream_id;
- to->http_status = from->http_status;
- to->content_length = from->content_length;
- to->sos_filter = from->sos_filter;
- if (from->headers) {
- to->headers = apr_table_clone(pool, from->headers);
- }
- if (from->trailers) {
- to->trailers = apr_table_clone(pool, from->trailers);
- }
- return to;
-}
-
-void h2_response_set_trailers(h2_response *response, apr_table_t *trailers)
-{
- response->trailers = trailers;
-}
-
-int h2_response_is_final(h2_response *response)
-{
- return response->http_status >= 200;
-}
-
-h2_response *h2_response_get_final(h2_response *response)
-{
- for (/**/; response; response = response->next) {
- if (h2_response_is_final(response)) {
- return response;
- }
- }
- return NULL;
-}
diff --git a/modules/http2/h2_response.h b/modules/http2/h2_response.h
deleted file mode 100644
index bc2fdb936e..0000000000
--- a/modules/http2/h2_response.h
+++ /dev/null
@@ -1,76 +0,0 @@
-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __mod_h2__h2_response__
-#define __mod_h2__h2_response__
-
-#include "h2.h"
-
-/**
- * Create the response from the status and parsed header lines.
- * @param stream_id id of the stream to create the response for
- * @param rst_error error for reset or 0
- * @param http_status http status code of response
- * @param hlines the text lines of the response header
- * @param pool the memory pool to use
- */
-h2_response *h2_response_create(int stream_id,
- int rst_error,
- int http_status,
- apr_array_header_t *hlines,
- apr_table_t *notes,
- apr_pool_t *pool);
-
-/**
- * Create the response from the given request_rec.
- * @param stream_id id of the stream to create the response for
- * @param r the request record which was processed
- * @param header the headers of the response
- * @param pool the memory pool to use
- */
-h2_response *h2_response_rcreate(int stream_id, request_rec *r, int status,
- apr_table_t *header, apr_pool_t *pool);
-
-/**
- * Create the response for the given error.
- * @param stream_id id of the stream to create the response for
- * @param type the error code
- * @param req the original h2_request
- * @param pool the memory pool to use
- */
-h2_response *h2_response_die(int stream_id, apr_status_t type,
- const struct h2_request *req, apr_pool_t *pool);
-
-/**
- * Deep copies the response into a new pool.
- * @param pool the pool to use for the clone
- * @param from the response to clone
- * @return the cloned response
- */
-h2_response *h2_response_clone(apr_pool_t *pool, h2_response *from);
-
-/**
- * Set the trailers in the response. Will replace any existing trailers. Will
- * *not* clone the table.
- *
- * @param response the repsone to set the trailers for
- * @param trailers the trailers to set
- */
-void h2_response_set_trailers(h2_response *response, apr_table_t *trailers);
-
-int h2_response_is_final(h2_response *response);
-h2_response *h2_response_get_final(h2_response *response);
-
-#endif /* defined(__mod_h2__h2_response__) */
diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c
index cf29a0281d..a941ee0b80 100644
--- a/modules/http2/h2_session.c
+++ b/modules/http2/h2_session.c
@@ -38,9 +38,8 @@
#include "h2_mplx.h"
#include "h2_push.h"
#include "h2_request.h"
-#include "h2_response.h"
+#include "h2_headers.h"
#include "h2_stream.h"
-#include "h2_from_h1.h"
#include "h2_task.h"
#include "h2_session.h"
#include "h2_util.h"
@@ -407,7 +406,7 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
status = h2_stream_add_header(stream, (const char *)name, namelen,
(const char *)value, valuelen);
- if (status != APR_SUCCESS && !stream->response) {
+ if (status != APR_SUCCESS && !h2_stream_is_ready(stream)) {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
return 0;
@@ -1138,6 +1137,10 @@ static apr_status_t h2_session_start(h2_session *session, int *rv)
return status;
}
+static apr_status_t on_stream_headers(h2_session *session, h2_stream *stream,
+ h2_headers *headers, apr_off_t len,
+ int eos);
+
static ssize_t stream_data_cb(nghttp2_session *ng2s,
int32_t stream_id,
uint8_t *buf,
@@ -1171,8 +1174,8 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
session->id, (int)stream_id);
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
-
- status = h2_stream_out_prepare(stream, &nread, &eos);
+
+ status = h2_stream_out_prepare(stream, &nread, &eos, NULL);
if (nread) {
*data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
}
@@ -1191,7 +1194,6 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
* it. Remember at our h2_stream that we need to do this.
*/
nread = 0;
- h2_mplx_suspend_stream(session->mplx, stream->id);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03071)
"h2_stream(%ld-%d): suspending",
session->id, (int)stream_id);
@@ -1206,25 +1208,8 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
}
if (eos) {
- apr_table_t *trailers = h2_stream_get_trailers(stream);
- if (trailers && !apr_is_empty_table(trailers)) {
- h2_ngheader *nh;
- int rv;
-
- nh = h2_util_ngheader_make(stream->pool, trailers);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03072)
- "h2_stream(%ld-%d): submit %d trailers",
- session->id, (int)stream_id,(int) nh->nvlen);
- rv = nghttp2_submit_trailer(ng2s, stream->id, nh->nv, nh->nvlen);
- if (rv < 0) {
- nread = rv;
- }
- *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
- }
-
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
}
-
return (ssize_t)nread;
}
@@ -1423,83 +1408,56 @@ static apr_status_t h2_session_send(h2_session *session)
}
/**
- * A stream was resumed as new output data arrived.
+ * headers for the stream are ready.
*/
-static apr_status_t on_stream_resume(void *ctx, int stream_id)
+static apr_status_t on_stream_headers(h2_session *session, h2_stream *stream,
+ h2_headers *headers, apr_off_t len,
+ int eos)
{
- h2_session *session = ctx;
- h2_stream *stream = get_stream(session, stream_id);
apr_status_t status = APR_SUCCESS;
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): on_resume", session->id, stream_id);
- if (stream) {
- int rv;
- if (stream->rst_error) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03466)
- "h2_stream(%ld-%d): RST_STREAM, err=%d",
- session->id, stream->id, stream->rst_error);
- rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
- stream->id, stream->rst_error);
- }
- else {
- rv = nghttp2_session_resume_data(session->ngh2, stream_id);
- }
- session->have_written = 1;
- ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
- APLOG_ERR : APLOG_DEBUG, 0, session->c,
- APLOGNO(02936)
- "h2_stream(%ld-%d): resuming %s",
- session->id, stream->id, rv? nghttp2_strerror(rv) : "");
- }
- return status;
-}
-
-/**
- * A response for the stream is ready.
- */
-static apr_status_t on_stream_response(void *ctx, int stream_id)
-{
- h2_session *session = ctx;
- h2_stream *stream = get_stream(session, stream_id);
- apr_status_t status = APR_SUCCESS;
- h2_response *response;
int rv = 0;
- AP_DEBUG_ASSERT(session);
+ ap_assert(session);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): on_response", session->id, stream_id);
- if (!stream) {
- return APR_NOTFOUND;
- }
- else if (!stream->response) {
+ "h2_stream(%ld-%d): on_headers", session->id, stream->id);
+ if (!headers) {
int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
-
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03466)
"h2_stream(%ld-%d): RST_STREAM, err=%d",
session->id, stream->id, err);
-
rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
stream->id, err);
goto leave;
}
-
- while ((response = h2_stream_get_unsent_response(stream)) != NULL) {
+ else if (headers->status < 100) {
+ rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
+ stream->id, headers->status);
+ goto leave;
+ }
+ else if (stream->has_response) {
+ h2_ngheader *nh;
+ int rv;
+
+ nh = h2_util_ngheader_make(stream->pool, headers->headers);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03072)
+ "h2_stream(%ld-%d): submit %d trailers",
+ session->id, (int)stream->id,(int) nh->nvlen);
+ rv = nghttp2_submit_trailer(session->ngh2, stream->id, nh->nv, nh->nvlen);
+ goto leave;
+ }
+ else {
nghttp2_data_provider provider, *pprovider = NULL;
h2_ngheader *ngh;
+ apr_table_t *hout;
const h2_priority *prio;
-
- if (stream->submitted) {
- rv = NGHTTP2_PROTOCOL_ERROR;
- goto leave;
- }
+ const char *note;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
"h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
- session->id, stream->id, response->http_status,
+ session->id, stream->id, headers->status,
(unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
- if (response->content_length != 0) {
+ if (!eos || len > 0) {
memset(&provider, 0, sizeof(provider));
provider.source.fd = stream->id;
provider.read_callback = stream_data_cb;
@@ -1522,23 +1480,36 @@ static apr_status_t on_stream_response(void *ctx, int stream_id)
* also have the pushed ones as well.
*/
if (!stream->initiated_on
- && h2_response_is_final(response)
- && H2_HTTP_2XX(response->http_status)
+ && h2_headers_are_response(headers)
+ && H2_HTTP_2XX(headers->status)
&& h2_session_push_enabled(session)) {
- h2_stream_submit_pushes(stream);
+ h2_stream_submit_pushes(stream, headers);
}
- prio = h2_stream_get_priority(stream);
+ prio = h2_stream_get_priority(stream, headers);
if (prio) {
h2_session_set_prio(session, stream, prio);
}
- ngh = h2_util_ngheader_make_res(stream->pool, response->http_status,
- response->headers);
- rv = nghttp2_submit_response(session->ngh2, response->stream_id,
+ hout = headers->headers;
+ note = apr_table_get(headers->notes, H2_FILTER_DEBUG_NOTE);
+ if (note && !strcmp("on", note)) {
+ int32_t connFlowIn, connFlowOut;
+
+ connFlowIn = nghttp2_session_get_effective_local_window_size(session->ngh2);
+ connFlowOut = nghttp2_session_get_remote_window_size(session->ngh2);
+ hout = apr_table_clone(stream->pool, hout);
+ apr_table_setn(hout, "conn-flow-in",
+ apr_itoa(stream->pool, connFlowIn));
+ apr_table_setn(hout, "conn-flow-out",
+ apr_itoa(stream->pool, connFlowOut));
+ }
+
+ ngh = h2_util_ngheader_make_res(stream->pool, headers->status, hout);
+ rv = nghttp2_submit_response(session->ngh2, stream->id,
ngh->nv, ngh->nvlen, pprovider);
- stream->submitted = h2_response_is_final(response);
+ stream->has_response = h2_headers_are_response(headers);
session->have_written = 1;
if (stream->initiated_on) {
@@ -1574,6 +1545,48 @@ leave:
return status;
}
+/**
+ * A stream was resumed as new output data arrived.
+ */
+static apr_status_t on_stream_resume(void *ctx, int stream_id)
+{
+ h2_session *session = ctx;
+ h2_stream *stream = get_stream(session, stream_id);
+ apr_status_t status = APR_EAGAIN;
+ int rv;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): on_resume", session->id, stream_id);
+ if (stream) {
+ apr_off_t len = 0;
+ int eos = 0;
+ h2_headers *headers = NULL;
+
+ send_headers:
+ status = h2_stream_out_prepare(stream, &len, &eos, &headers);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
+ "h2_stream(%ld-%d): prepared len=%ld, eos=%d",
+ session->id, stream_id, (long)len, eos);
+ if (headers) {
+ status = on_stream_headers(session, stream, headers, len, eos);
+ if (status != APR_SUCCESS) {
+ return status;
+ }
+ goto send_headers;
+ }
+ else if (status != APR_EAGAIN) {
+ rv = nghttp2_session_resume_data(session->ngh2, stream_id);
+ session->have_written = 1;
+ ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
+ APLOG_ERR : APLOG_DEBUG, 0, session->c,
+ APLOGNO(02936)
+ "h2_stream(%ld-%d): resuming %s",
+ session->id, stream->id, rv? nghttp2_strerror(rv) : "");
+ }
+ }
+ return status;
+}
+
static apr_status_t h2_session_receive(void *ctx, const char *data,
apr_size_t len, apr_size_t *readlen)
{
@@ -1664,40 +1677,6 @@ static apr_status_t h2_session_read(h2_session *session, int block)
return rstatus;
}
-static int unsubmitted_iter(void *ctx, void *val)
-{
- h2_stream *stream = val;
- if (h2_stream_needs_submit(stream)) {
- *((int *)ctx) = 1;
- return 0;
- }
- return 1;
-}
-
-static int has_unsubmitted_streams(h2_session *session)
-{
- int has_unsubmitted = 0;
- h2_ihash_iter(session->streams, unsubmitted_iter, &has_unsubmitted);
- return has_unsubmitted;
-}
-
-static int suspended_iter(void *ctx, void *val)
-{
- h2_stream *stream = val;
- if (h2_stream_is_suspended(stream)) {
- *((int *)ctx) = 1;
- return 0;
- }
- return 1;
-}
-
-static int has_suspended_streams(h2_session *session)
-{
- int has_suspended = 0;
- h2_ihash_iter(session->streams, suspended_iter, &has_suspended);
- return has_suspended;
-}
-
static const char *StateNames[] = {
"INIT", /* H2_SESSION_ST_INIT */
"DONE", /* H2_SESSION_ST_DONE */
@@ -1842,8 +1821,7 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
session->id, session->open_streams);
h2_conn_io_flush(&session->io);
if (session->open_streams > 0) {
- if (has_unsubmitted_streams(session)
- || has_suspended_streams(session)) {
+ if (h2_mplx_awaits_data(session->mplx)) {
/* waiting for at least one stream to produce data */
transit(session, "no io", H2_SESSION_ST_WAIT);
}
@@ -2207,7 +2185,6 @@ apr_status_t h2_session_process(h2_session *session, int async)
/* trigger window updates, stream resumes and submits */
status = h2_mplx_dispatch_master_events(session->mplx,
on_stream_resume,
- on_stream_response,
session);
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c,
diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h
index e905a390a2..4d21cb86f5 100644
--- a/modules/http2/h2_session.h
+++ b/modules/http2/h2_session.h
@@ -49,7 +49,6 @@ struct h2_mplx;
struct h2_priority;
struct h2_push;
struct h2_push_diary;
-struct h2_response;
struct h2_session;
struct h2_stream;
struct h2_task;
@@ -187,11 +186,6 @@ void h2_session_abort(h2_session *session, apr_status_t reason);
*/
void h2_session_close(h2_session *session);
-/* Start submitting the response to a stream request. This is possible
- * once we have all the response headers. */
-apr_status_t h2_session_handle_response(h2_session *session,
- struct h2_stream *stream);
-
/**
* Create and register a new stream under the given id.
*
diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c
index 7720237b10..35747710cd 100644
--- a/modules/http2/h2_stream.c
+++ b/modules/http2/h2_stream.c
@@ -16,6 +16,8 @@
#include <assert.h>
#include <stddef.h>
+#include <apr_strings.h>
+
#include <httpd.h>
#include <http_core.h>
#include <http_connection.h>
@@ -29,11 +31,10 @@
#include "h2_conn.h"
#include "h2_config.h"
#include "h2_h2.h"
-#include "h2_filter.h"
#include "h2_mplx.h"
#include "h2_push.h"
#include "h2_request.h"
-#include "h2_response.h"
+#include "h2_headers.h"
#include "h2_session.h"
#include "h2_stream.h"
#include "h2_task.h"
@@ -62,8 +63,8 @@ static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, char *tag)
apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
len = h2_util_bb_print(buffer, bmax, tag, "", s->buffer);
- ap_log_cerror(APLOG_MARK, lvl, 0, c, "bb_dump(%ld-%d): %s",
- c->id, s->id, len? buffer : line);
+ ap_log_cerror(APLOG_MARK, lvl, 0, c, "bb_dump(%s): %s",
+ c->log_id, len? buffer : line);
}
}
@@ -150,6 +151,23 @@ static int output_open(h2_stream *stream)
}
}
+static void prep_output(h2_stream *stream) {
+ conn_rec *c = stream->session->c;
+ if (!stream->buffer) {
+ stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
+ }
+}
+
+static void prepend_response(h2_stream *stream, h2_headers *response)
+{
+ conn_rec *c = stream->session->c;
+ apr_bucket *b;
+
+ prep_output(stream);
+ b = h2_bucket_headers_create(c->bucket_alloc, response);
+ APR_BRIGADE_INSERT_HEAD(stream->buffer, b);
+}
+
static apr_status_t stream_pool_cleanup(void *ctx)
{
h2_stream *stream = ctx;
@@ -252,21 +270,6 @@ void h2_stream_rst(h2_stream *stream, int error_code)
stream->session->id, stream->id, error_code);
}
-struct h2_response *h2_stream_get_response(h2_stream *stream)
-{
- return stream->response;
-}
-
-struct h2_response *h2_stream_get_unsent_response(h2_stream *stream)
-{
- h2_response *unsent = (stream->last_sent?
- stream->last_sent->next : stream->response);
- if (unsent) {
- stream->last_sent = unsent;
- }
- return unsent;
-}
-
apr_status_t h2_stream_set_request_rec(h2_stream *stream, request_rec *r)
{
h2_request *req;
@@ -277,8 +280,7 @@ apr_status_t h2_stream_set_request_rec(h2_stream *stream, request_rec *r)
if (stream->rst_error) {
return APR_ECONNRESET;
}
- status = h2_request_rcreate(&req, stream->pool, stream->id,
- stream->initiated_on, r);
+ status = h2_request_rcreate(&req, stream->pool, r);
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
"h2_request(%d): set_request_rec %s host=%s://%s%s",
stream->id, req->method, req->scheme, req->authority,
@@ -295,13 +297,40 @@ apr_status_t h2_stream_set_request(h2_stream *stream, const h2_request *r)
return APR_SUCCESS;
}
+static apr_status_t add_trailer(h2_stream *stream,
+ const char *name, size_t nlen,
+ const char *value, size_t vlen)
+{
+ conn_rec *c = stream->session->c;
+ char *hname, *hvalue;
+
+ if (nlen == 0 || name[0] == ':') {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, c, APLOGNO(03060)
+ "h2_request(%ld-%d): pseudo header in trailer",
+ c->id, stream->id);
+ return APR_EINVAL;
+ }
+ if (h2_req_ignore_trailer(name, nlen)) {
+ return APR_SUCCESS;
+ }
+ if (!stream->trailers) {
+ stream->trailers = apr_table_make(stream->pool, 5);
+ }
+ hname = apr_pstrndup(stream->pool, name, nlen);
+ hvalue = apr_pstrndup(stream->pool, value, vlen);
+ h2_util_camel_case_header(hname, nlen);
+ apr_table_mergen(stream->trailers, hname, hvalue);
+
+ return APR_SUCCESS;
+}
+
apr_status_t h2_stream_add_header(h2_stream *stream,
const char *name, size_t nlen,
const char *value, size_t vlen)
{
AP_DEBUG_ASSERT(stream);
- if (!stream->response) {
+ if (!stream->has_response) {
if (name[0] == ':') {
if ((vlen) > stream->session->s->limit_req_line) {
/* pseudo header: approximation of request line size check */
@@ -336,10 +365,7 @@ apr_status_t h2_stream_add_header(h2_stream *stream,
}
if (h2_stream_is_scheduled(stream)) {
- /* FIXME: this is not clean. we modify a struct that is being processed
- * by another thread potentially. */
- return h2_request_add_trailer((h2_request*)stream->request, stream->pool,
- name, nlen, value, vlen);
+ return add_trailer(stream, name, nlen, value, vlen);
}
else {
if (!stream->rtmp) {
@@ -366,36 +392,38 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
if (eos) {
close_input(stream);
}
+
+ if (!stream->input) {
+ h2_beam_create(&stream->input, stream->pool, stream->id, "input", 0);
+ }
- if (stream->response) {
+ if (h2_stream_is_ready(stream)) {
/* already have a resonse, probably a HTTP error code */
return h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
}
else if (!stream->request && stream->rtmp) {
/* This is the common case: a h2_request was being assembled, now
* it gets finalized and checked for completness */
- status = h2_request_end_headers(stream->rtmp, stream->pool,
- eos, push_enabled);
+ status = h2_request_end_headers(stream->rtmp, stream->pool, eos);
if (status == APR_SUCCESS) {
- stream->rtmp->id = stream->id;
- stream->rtmp->initiated_on = stream->initiated_on;
stream->rtmp->serialize = h2_config_geti(stream->session->config,
H2_CONF_SER_HEADERS);
stream->request = stream->rtmp;
stream->rtmp = NULL;
stream->scheduled = 1;
- stream->input_remaining = stream->request->content_length;
+ stream->push_policy = h2_push_policy_determine(stream->request->headers,
+ stream->pool, push_enabled);
+
status = h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
"h2_stream(%ld-%d): scheduled %s %s://%s%s "
- "clen=%ld, body=%d, chunked=%d",
+ "chunked=%d",
stream->session->id, stream->id,
stream->request->method, stream->request->scheme,
stream->request->authority, stream->request->path,
- (long)stream->request->content_length,
- stream->request->body, stream->request->chunked);
+ stream->request->chunked);
return status;
}
}
@@ -420,21 +448,36 @@ int h2_stream_is_scheduled(const h2_stream *stream)
apr_status_t h2_stream_close_input(h2_stream *stream)
{
+ conn_rec *c = stream->session->c;
apr_status_t status = APR_SUCCESS;
-
- AP_DEBUG_ASSERT(stream);
+
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
"h2_stream(%ld-%d): closing input",
stream->session->id, stream->id);
-
if (stream->rst_error) {
return APR_ECONNRESET;
}
- if (close_input(stream) && stream->input) {
- status = h2_beam_close(stream->input);
+ if (!stream->input) {
+ h2_beam_create(&stream->input, stream->pool, stream->id, "input", 0);
}
- return status;
+
+ if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
+ h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers,
+ NULL, stream->pool);
+ apr_bucket *b = h2_bucket_headers_create(c->bucket_alloc, r);
+ apr_bucket_brigade *tmp;
+
+ tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(tmp, b);
+ status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
+ apr_brigade_destroy(tmp);
+
+ stream->trailers = NULL;
+ }
+
+ close_input(stream);
+ return h2_beam_close(stream->input);
}
apr_status_t h2_stream_write_data(h2_stream *stream,
@@ -459,52 +502,22 @@ apr_status_t h2_stream_write_data(h2_stream *stream,
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): add %ld input bytes",
stream->session->id, stream->id, (long)len);
-
- if (!stream->request->chunked) {
- stream->input_remaining -= len;
- if (stream->input_remaining < 0) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c,
- APLOGNO(02961)
- "h2_stream(%ld-%d): got %ld more content bytes than announced "
- "in content-length header: %ld",
- stream->session->id, stream->id,
- (long)stream->request->content_length,
- -(long)stream->input_remaining);
- h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
- return APR_ECONNABORTED;
- }
- }
tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
apr_brigade_write(tmp, NULL, NULL, data, len);
- if (eos) {
- APR_BRIGADE_INSERT_TAIL(tmp, apr_bucket_eos_create(c->bucket_alloc));
- close_input(stream);
- }
status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
apr_brigade_destroy(tmp);
stream->in_data_frames++;
stream->in_data_octets += len;
+ if (eos) {
+ return h2_stream_close_input(stream);
+ }
+
return status;
}
-void h2_stream_set_suspended(h2_stream *stream, int suspended)
-{
- AP_DEBUG_ASSERT(stream);
- stream->suspended = !!suspended;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
- "h2_stream(%ld-%d): suspended=%d",
- stream->session->id, stream->id, stream->suspended);
-}
-
-int h2_stream_is_suspended(const h2_stream *stream)
-{
- AP_DEBUG_ASSERT(stream);
- return stream->suspended;
-}
-
static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount)
{
conn_rec *c = stream->session->c;
@@ -549,89 +562,64 @@ static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount)
return status;
}
-apr_status_t h2_stream_add_response(h2_stream *stream, h2_response *response,
- h2_bucket_beam *output)
-{
- apr_status_t status = APR_SUCCESS;
- conn_rec *c = stream->session->c;
- h2_response **pr = &stream->response;
-
- if (!output_open(stream)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
- "h2_stream(%ld-%d): output closed",
- stream->session->id, stream->id);
- return APR_ECONNRESET;
- }
- if (stream->submitted) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
- "h2_stream(%ld-%d): already submitted final response",
- stream->session->id, stream->id);
- return APR_ECONNRESET;
- }
-
- /* append */
- while (*pr) {
- pr = &((*pr)->next);
- }
- *pr = response;
-
- if (h2_response_is_final(response)) {
- stream->output = output;
- stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
-
- h2_stream_filter(stream);
- if (stream->output) {
- status = fill_buffer(stream, 0);
- }
- }
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_stream(%ld-%d): set_response(%d)",
- stream->session->id, stream->id,
- stream->response->http_status);
- return status;
-}
-
apr_status_t h2_stream_set_error(h2_stream *stream, int http_status)
{
- h2_response *response;
+ h2_headers *response;
- if (stream->submitted) {
+ if (h2_stream_is_ready(stream)) {
return APR_EINVAL;
}
if (stream->rtmp) {
stream->request = stream->rtmp;
stream->rtmp = NULL;
}
- response = h2_response_die(stream->id, http_status,
- stream->request, stream->pool);
- return h2_stream_add_response(stream, response, NULL);
+ response = h2_headers_die(http_status, stream->request, stream->pool);
+ prepend_response(stream, response);
+ return APR_SUCCESS;
}
-static const apr_size_t DATA_CHUNK_SIZE = ((16*1024) - 100 - 9);
+static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb)
+{
+ if (bb) {
+ apr_bucket *b = APR_BRIGADE_FIRST(bb);
+ while (b != APR_BRIGADE_SENTINEL(bb)) {
+ if (H2_BUCKET_IS_HEADERS(b)) {
+ return b;
+ }
+ b = APR_BUCKET_NEXT(b);
+ }
+ }
+ return NULL;
+}
-apr_status_t h2_stream_out_prepare(h2_stream *stream,
- apr_off_t *plen, int *peos)
+apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
+ int *peos, h2_headers **presponse)
{
conn_rec *c = stream->session->c;
apr_status_t status = APR_SUCCESS;
apr_off_t requested;
+ apr_bucket *b, *e;
+ if (presponse) {
+ *presponse = NULL;
+ }
+
if (stream->rst_error) {
*plen = 0;
*peos = 1;
return APR_ECONNRESET;
}
-
- if (!stream->buffer) {
- return APR_EAGAIN;
- }
+ if (!output_open(stream)) {
+ return APR_ECONNRESET;
+ }
+ prep_output(stream);
+
if (*plen > 0) {
- requested = H2MIN(*plen, DATA_CHUNK_SIZE);
+ requested = H2MIN(*plen, H2_DATA_CHUNK_SIZE);
}
else {
- requested = DATA_CHUNK_SIZE;
+ requested = H2_DATA_CHUNK_SIZE;
}
*plen = requested;
@@ -639,7 +627,7 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream,
h2_util_bb_avail(stream->buffer, plen, peos);
if (!*peos && *plen < requested) {
/* try to get more data */
- status = fill_buffer(stream, (requested - *plen) + DATA_CHUNK_SIZE);
+ status = fill_buffer(stream, (requested - *plen) + H2_DATA_CHUNK_SIZE);
if (APR_STATUS_IS_EOF(status)) {
apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(stream->buffer, eos);
@@ -653,17 +641,66 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream,
h2_util_bb_avail(stream->buffer, plen, peos);
}
H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_post");
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_stream(%ld-%d): prepare, len=%ld eos=%d, trailers=%s",
- c->id, stream->id, (long)*plen, *peos,
- (stream->response && stream->response->trailers)?
- "yes" : "no");
- if (!*peos && !*plen && status == APR_SUCCESS) {
- return APR_EAGAIN;
+
+ b = APR_BRIGADE_FIRST(stream->buffer);
+ while (b != APR_BRIGADE_SENTINEL(stream->buffer)) {
+ e = APR_BUCKET_NEXT(b);
+ if (APR_BUCKET_IS_FLUSH(b)) {
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ }
+ else {
+ break;
+ }
+ b = e;
}
+
+ b = get_first_headers_bucket(stream->buffer);
+ if (b) {
+ /* there are HEADERS to submit */
+ *peos = 0;
+ *plen = 0;
+ if (b == APR_BRIGADE_FIRST(stream->buffer)) {
+ if (presponse) {
+ *presponse = h2_bucket_headers_get(b);
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ status = APR_SUCCESS;
+ }
+ else {
+ /* someone needs to retrieve the response first */
+ h2_mplx_keep_active(stream->session->mplx, stream->id);
+ status = APR_EAGAIN;
+ }
+ }
+ else {
+ apr_bucket *e = APR_BRIGADE_FIRST(stream->buffer);
+ while (e != APR_BRIGADE_SENTINEL(stream->buffer)) {
+ if (e == b) {
+ break;
+ }
+ else if (e->length != (apr_size_t)-1) {
+ *plen += e->length;
+ }
+ e = APR_BUCKET_NEXT(e);
+ }
+ }
+ }
+
+ if (!*peos && !*plen && status == APR_SUCCESS
+ && (!presponse || !*presponse)) {
+ status = APR_EAGAIN;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+ "h2_stream(%ld-%d): prepare, len=%ld eos=%d",
+ c->id, stream->id, (long)*plen, *peos);
return status;
}
+static int is_not_headers(apr_bucket *b)
+{
+ return !H2_BUCKET_IS_HEADERS(b);
+}
apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
apr_off_t *plen, int *peos)
@@ -674,7 +711,7 @@ apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
if (stream->rst_error) {
return APR_ECONNRESET;
}
- status = h2_append_brigade(bb, stream->buffer, plen, peos);
+ status = h2_append_brigade(bb, stream->buffer, plen, peos, is_not_headers);
if (status == APR_SUCCESS && !*peos && !*plen) {
status = APR_EAGAIN;
}
@@ -690,27 +727,13 @@ int h2_stream_input_is_open(const h2_stream *stream)
return input_open(stream);
}
-int h2_stream_needs_submit(const h2_stream *stream)
-{
- switch (stream->state) {
- case H2_STREAM_ST_OPEN:
- case H2_STREAM_ST_CLOSED_INPUT:
- case H2_STREAM_ST_CLOSED_OUTPUT:
- case H2_STREAM_ST_CLOSED:
- return !stream->submitted;
- default:
- return 0;
- }
-}
-
-apr_status_t h2_stream_submit_pushes(h2_stream *stream)
+apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
{
apr_status_t status = APR_SUCCESS;
apr_array_header_t *pushes;
int i;
- pushes = h2_push_collect_update(stream, stream->request,
- stream->response);
+ pushes = h2_push_collect_update(stream, stream->request, response);
if (pushes && !apr_is_empty_array(pushes)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
"h2_stream(%ld-%d): found %d push candidates",
@@ -729,13 +752,14 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream)
apr_table_t *h2_stream_get_trailers(h2_stream *stream)
{
- return stream->response? stream->response->trailers : NULL;
+ return NULL;
}
-const h2_priority *h2_stream_get_priority(h2_stream *stream)
+const h2_priority *h2_stream_get_priority(h2_stream *stream,
+ h2_headers *response)
{
- if (stream->response && stream->initiated_on) {
- const char *ctype = apr_table_get(stream->response->headers, "content-type");
+ if (response && stream->initiated_on) {
+ const char *ctype = apr_table_get(response->headers, "content-type");
if (ctype) {
/* FIXME: Not good enough, config needs to come from request->server */
return h2_config_get_priority(stream->session->config, ctype);
@@ -767,3 +791,15 @@ const char *h2_stream_state_str(h2_stream *stream)
}
}
+int h2_stream_is_ready(h2_stream *stream)
+{
+ if (stream->has_response) {
+ return 1;
+ }
+ else if (stream->buffer && get_first_headers_bucket(stream->buffer)) {
+ return 1;
+ }
+ return 0;
+}
+
+
diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h
index e871cb7ee2..346e4e2a22 100644
--- a/modules/http2/h2_stream.h
+++ b/modules/http2/h2_stream.h
@@ -25,16 +25,16 @@
* connection to the client. The h2_session writes to the h2_stream,
* adding HEADERS and DATA and finally an EOS. When headers are done,
* h2_stream is scheduled for handling, which is expected to produce
- * a h2_response.
+ * a h2_headers.
*
- * The h2_response gives the HEADER frames to sent to the client, followed
+ * The h2_headers gives the HEADER frames to sent to the client, followed
* by DATA frames read from the h2_stream until EOS is reached.
*/
struct h2_mplx;
struct h2_priority;
struct h2_request;
-struct h2_response;
+struct h2_headers;
struct h2_session;
struct h2_sos;
struct h2_bucket_beam;
@@ -51,27 +51,27 @@ struct h2_stream {
apr_pool_t *pool; /* the memory pool for this stream */
const struct h2_request *request; /* the request made in this stream */
struct h2_request *rtmp; /* request being assembled */
+ apr_table_t *trailers; /* optional incoming trailers */
struct h2_bucket_beam *input;
int request_headers_added; /* number of request headers added */
+ unsigned int push_policy; /* which push policy to use for this request */
- struct h2_response *response;
- struct h2_response *last_sent;
struct h2_bucket_beam *output;
apr_bucket_brigade *buffer;
apr_array_header_t *files; /* apr_file_t* we collected during I/O */
int rst_error; /* stream error for RST_STREAM */
unsigned int aborted : 1; /* was aborted */
- unsigned int suspended : 1; /* DATA sending has been suspended */
unsigned int scheduled : 1; /* stream has been scheduled */
unsigned int started : 1; /* stream has started processing */
- unsigned int submitted : 1; /* response HEADER has been sent */
+ unsigned int has_response : 1; /* response headers are known */
- apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */
apr_off_t out_data_frames; /* # of DATA frames sent */
apr_off_t out_data_octets; /* # of DATA octets (payload) sent */
apr_off_t in_data_frames; /* # of DATA frames received */
apr_off_t in_data_octets; /* # of DATA octets (payload) received */
+
+ const char *sos_filter;
};
@@ -188,22 +188,6 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
*/
int h2_stream_is_scheduled(const h2_stream *stream);
-struct h2_response *h2_stream_get_response(h2_stream *stream);
-struct h2_response *h2_stream_get_unsent_response(h2_stream *stream);
-
-/**
- * Set the response for this stream. Invoked when all meta data for
- * the stream response has been collected.
- *
- * @param stream the stream to set the response for
- * @param response the response data for the stream
- * @param bb bucket brigade with output data for the stream. Optional,
- * may be incomplete.
- */
-apr_status_t h2_stream_add_response(h2_stream *stream,
- struct h2_response *response,
- struct h2_bucket_beam *output);
-
/**
* Set the HTTP error status as response.
*/
@@ -218,12 +202,13 @@ apr_status_t h2_stream_set_error(h2_stream *stream, int http_status);
* may be read without blocking
* @param peos (out) != 0 iff end of stream will be reached when reading plen
* bytes (out value).
+ * @param presponse (out) the response of one became available
* @return APR_SUCCESS if out information was computed successfully.
* APR_EAGAIN if not data is available and end of stream has not been
* reached yet.
*/
-apr_status_t h2_stream_out_prepare(h2_stream *stream,
- apr_off_t *plen, int *peos);
+apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
+ int *peos, h2_headers **presponse);
/**
* Read a maximum number of bytes into the bucket brigade.
@@ -251,20 +236,6 @@ apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
apr_table_t *h2_stream_get_trailers(h2_stream *stream);
/**
- * Set the suspended state of the stream.
- * @param stream the stream to change state on
- * @param suspended boolean value if stream is suspended
- */
-void h2_stream_set_suspended(h2_stream *stream, int suspended);
-
-/**
- * Check if the stream has been suspended.
- * @param stream the stream to check
- * @return != 0 iff stream is suspended.
- */
-int h2_stream_is_suspended(const h2_stream *stream);
-
-/**
* Check if the stream has open input.
* @param stream the stream to check
* @return != 0 iff stream has open input.
@@ -272,24 +243,18 @@ int h2_stream_is_suspended(const h2_stream *stream);
int h2_stream_input_is_open(const h2_stream *stream);
/**
- * Check if the stream has not submitted a response or RST yet.
- * @param stream the stream to check
- * @return != 0 iff stream has not submitted a response or RST.
- */
-int h2_stream_needs_submit(const h2_stream *stream);
-
-/**
* Submit any server push promises on this stream and schedule
* the tasks connection with these.
*
* @param stream the stream for which to submit
*/
-apr_status_t h2_stream_submit_pushes(h2_stream *stream);
+apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response);
/**
* Get priority information set for this stream.
*/
-const struct h2_priority *h2_stream_get_priority(h2_stream *stream);
+const struct h2_priority *h2_stream_get_priority(h2_stream *stream,
+ h2_headers *response);
/**
* Return a textual representation of the stream state as in RFC 7540
@@ -297,4 +262,10 @@ const struct h2_priority *h2_stream_get_priority(h2_stream *stream);
*/
const char *h2_stream_state_str(h2_stream *stream);
+/**
+ * Determine if stream is ready for submitting a response or a RST
+ * @param stream the stream to check
+ */
+int h2_stream_is_ready(h2_stream *stream);
+
#endif /* defined(__mod_h2__h2_stream__) */
diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c
index 87ab619acc..773b3768cd 100644
--- a/modules/http2/h2_task.c
+++ b/modules/http2/h2_task.c
@@ -42,13 +42,27 @@
#include "h2_h2.h"
#include "h2_mplx.h"
#include "h2_request.h"
-#include "h2_response.h"
+#include "h2_headers.h"
#include "h2_session.h"
#include "h2_stream.h"
#include "h2_task.h"
#include "h2_worker.h"
#include "h2_util.h"
+static void H2_TASK_OUT_LOG(int lvl, h2_task *task, apr_bucket_brigade *bb, char *tag)
+{
+ if (APLOG_C_IS_LEVEL(task->c, lvl)) {
+ conn_rec *c = task->c;
+ char buffer[4 * 1024];
+ const char *line = "(null)";
+ apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
+
+ len = h2_util_bb_print(buffer, bmax, tag, "", bb);
+ ap_log_cerror(APLOG_MARK, lvl, 0, c, "bb_dump(%s): %s",
+ task->id, len? buffer : line);
+ }
+}
+
/*******************************************************************************
* task input handling
******************************************************************************/
@@ -60,90 +74,13 @@ static int input_ser_header(void *ctx, const char *name, const char *value)
return 1;
}
-static void make_chunk(h2_task *task, apr_bucket_brigade *bb,
- apr_bucket *first, apr_uint64_t chunk_len,
- apr_bucket *tail)
-{
- /* Surround the buckets [first, tail[ with new buckets carrying the
- * HTTP/1.1 chunked encoding format. If tail is NULL, the chunk extends
- * to the end of the brigade. */
- char buffer[128];
- apr_bucket *c;
- int len;
-
- len = apr_snprintf(buffer, H2_ALEN(buffer),
- "%"APR_UINT64_T_HEX_FMT"\r\n", chunk_len);
- c = apr_bucket_heap_create(buffer, len, NULL, bb->bucket_alloc);
- APR_BUCKET_INSERT_BEFORE(first, c);
- c = apr_bucket_heap_create("\r\n", 2, NULL, bb->bucket_alloc);
- if (tail) {
- APR_BUCKET_INSERT_BEFORE(tail, c);
- }
- else {
- APR_BRIGADE_INSERT_TAIL(bb, c);
- }
-}
-
-static apr_status_t input_handle_eos(h2_task *task, request_rec *r,
- apr_bucket *b)
-{
- apr_status_t status = APR_SUCCESS;
- apr_bucket_brigade *bb = task->input.bb;
- apr_table_t *t = task->request->trailers;
-
- if (task->input.chunked) {
- apr_bucket_brigade *tmp = apr_brigade_split_ex(bb, b, NULL);
- if (t && !apr_is_empty_table(t)) {
- status = apr_brigade_puts(bb, NULL, NULL, "0\r\n");
- apr_table_do(input_ser_header, task, t, NULL);
- status = apr_brigade_puts(bb, NULL, NULL, "\r\n");
- }
- else {
- status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
- }
- APR_BRIGADE_CONCAT(bb, tmp);
- apr_brigade_destroy(tmp);
- }
- else if (r && t && !apr_is_empty_table(t)){
- /* trailers passed in directly. */
- apr_table_overlap(r->trailers_in, t, APR_OVERLAP_TABLES_SET);
- }
- task->input.eos_written = 1;
- return status;
-}
-
-static apr_status_t input_append_eos(h2_task *task, request_rec *r)
-{
- apr_status_t status = APR_SUCCESS;
- apr_bucket_brigade *bb = task->input.bb;
- apr_table_t *t = task->request->trailers;
-
- if (task->input.chunked) {
- if (t && !apr_is_empty_table(t)) {
- status = apr_brigade_puts(bb, NULL, NULL, "0\r\n");
- apr_table_do(input_ser_header, task, t, NULL);
- status = apr_brigade_puts(bb, NULL, NULL, "\r\n");
- }
- else {
- status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
- }
- }
- else if (r && t && !apr_is_empty_table(t)){
- /* trailers passed in directly. */
- apr_table_overlap(r->trailers_in, t, APR_OVERLAP_TABLES_SET);
- }
- APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(bb->bucket_alloc));
- task->input.eos_written = 1;
- return status;
-}
-
static apr_status_t input_read(h2_task *task, ap_filter_t* f,
apr_bucket_brigade* bb, ap_input_mode_t mode,
apr_read_type_e block, apr_off_t readbytes)
{
apr_status_t status = APR_SUCCESS;
- apr_bucket *b, *next, *first_data;
- apr_off_t bblen = 0;
+ apr_bucket *b, *next;
+ apr_off_t bblen;
apr_size_t rmax = ((readbytes <= APR_SIZE_MAX)?
(apr_size_t)readbytes : APR_SIZE_MAX);
@@ -160,28 +97,9 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f,
}
if (!task->input.bb) {
- if (!task->input.eos_written) {
- input_append_eos(task, f->r);
- return APR_SUCCESS;
- }
return APR_EOF;
}
- /*
- if (f->r && f->r->expecting_100) {
- ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, f->c,
- "h2_task(%s): need to send 100 Continue here",
- task->id);
- f->r->expecting_100 = 0;
- }
- if (task->r && task->r->expecting_100) {
- ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, f->c,
- "h2_task2(%s): need to send 100 Continue here",
- task->id);
- task->r->expecting_100 = 0;
- }
- */
-
/* Cleanup brigades from those nasty 0 length non-meta buckets
* that apr_brigade_split_line() sometimes produces. */
for (b = APR_BRIGADE_FIRST(task->input.bb);
@@ -192,12 +110,11 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f,
}
}
- while (APR_BRIGADE_EMPTY(task->input.bb) && !task->input.eos) {
+ while (APR_BRIGADE_EMPTY(task->input.bb)) {
/* Get more input data for our request. */
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
"h2_task(%s): get more data from mplx, block=%d, "
- "readbytes=%ld, queued=%ld",
- task->id, block, (long)readbytes, (long)bblen);
+ "readbytes=%ld", task->id, block, (long)readbytes);
/* Override the block mode we get called with depending on the input's
* setting. */
@@ -219,7 +136,7 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f,
status = APR_SUCCESS;
}
else if (APR_STATUS_IS_EOF(status)) {
- task->input.eos = 1;
+ break;
}
else if (status != APR_SUCCESS) {
return status;
@@ -229,51 +146,14 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f,
* chunked encoding if necessary */
h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
"input.beam recv raw", task->input.bb);
- first_data = NULL;
- bblen = 0;
- for (b = APR_BRIGADE_FIRST(task->input.bb);
- b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
- next = APR_BUCKET_NEXT(b);
- if (APR_BUCKET_IS_METADATA(b)) {
- if (first_data && task->input.chunked) {
- make_chunk(task, task->input.bb, first_data, bblen, b);
- first_data = NULL;
- bblen = 0;
- }
- if (APR_BUCKET_IS_EOS(b)) {
- task->input.eos = 1;
- input_handle_eos(task, f->r, b);
- h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
- "input.bb after handle eos",
- task->input.bb);
- }
- }
- else if (b->length == 0) {
- apr_bucket_delete(b);
- }
- else {
- if (!first_data) {
- first_data = b;
- }
- bblen += b->length;
- }
- }
- if (first_data && task->input.chunked) {
- make_chunk(task, task->input.bb, first_data, bblen, NULL);
- }
-
if (h2_task_logio_add_bytes_in) {
+ apr_brigade_length(bb, 0, &bblen);
h2_task_logio_add_bytes_in(f->c, bblen);
}
}
- if (task->input.eos) {
- if (!task->input.eos_written) {
- input_append_eos(task, f->r);
- }
- if (APR_BRIGADE_EMPTY(task->input.bb)) {
- return APR_EOF;
- }
+ if (status == APR_EOF && APR_BRIGADE_EMPTY(task->input.bb)) {
+ return APR_EOF;
}
h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
@@ -333,32 +213,15 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f,
* task output handling
******************************************************************************/
-static apr_status_t open_response(h2_task *task, h2_response *response)
+static apr_status_t open_output(h2_task *task)
{
- if (!response) {
- /* This happens currently when ap_die(status, r) is invoked
- * by a read request filter. */
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, APLOGNO(03204)
- "h2_task(%s): write without response for %s %s %s",
- task->id,
- task->request->method,
- task->request->authority,
- task->request->path);
- task->c->aborted = 1;
- return APR_ECONNABORTED;
- }
-
- if (h2_task_logio_add_bytes_out) {
- /* count headers as if we'd do a HTTP/1.1 serialization */
- task->output.written = h2_util_table_bytes(response->headers, 3)+1;
- h2_task_logio_add_bytes_out(task->c, task->output.written);
- }
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, APLOGNO(03348)
- "h2_task(%s): open response to %s %s %s",
+ "h2_task(%s): open output to %s %s %s",
task->id, task->request->method,
task->request->authority,
task->request->path);
- return h2_mplx_out_open(task->mplx, task->stream_id, response);
+ task->output.opened = 1;
+ return h2_mplx_out_open(task->mplx, task->stream_id, task->output.beam);
}
static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb)
@@ -367,23 +230,22 @@ static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb)
apr_status_t status;
apr_brigade_length(bb, 0, &written);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
- "h2_task(%s): write response body (%ld bytes)",
- task->id, (long)written);
-
+ H2_TASK_OUT_LOG(APLOG_TRACE2, task, bb, "h2_task send_out");
+ /* engines send unblocking */
status = h2_beam_send(task->output.beam, bb,
- task->blocking? APR_BLOCK_READ
- : APR_NONBLOCK_READ);
+ task->assigned? APR_NONBLOCK_READ
+ : APR_BLOCK_READ);
if (APR_STATUS_IS_EAGAIN(status)) {
apr_brigade_length(bb, 0, &left);
written -= left;
status = APR_SUCCESS;
}
if (status == APR_SUCCESS) {
- task->output.written += written;
if (h2_task_logio_add_bytes_out) {
h2_task_logio_add_bytes_out(task->c, written);
}
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, task->c,
+ "h2_task(%s): send_out done", task->id);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c,
@@ -397,8 +259,8 @@ static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb)
* request_rec out filter chain) into the h2_mplx for further sending
* on the master connection.
*/
-static apr_status_t output_write(h2_task *task, ap_filter_t* f,
- apr_bucket_brigade* bb)
+static apr_status_t slave_out(h2_task *task, ap_filter_t* f,
+ apr_bucket_brigade* bb)
{
apr_bucket *b;
apr_status_t status = APR_SUCCESS;
@@ -448,7 +310,7 @@ static apr_status_t output_write(h2_task *task, ap_filter_t* f,
if ((!task->output.bb || APR_BRIGADE_EMPTY(task->output.bb))
&& !APR_BRIGADE_EMPTY(bb)) {
/* check if we have a flush before the end-of-request */
- if (!task->output.response_open) {
+ if (!task->output.opened) {
for (b = APR_BRIGADE_FIRST(bb);
b != APR_BRIGADE_SENTINEL(bb);
b = APR_BUCKET_NEXT(b)) {
@@ -476,42 +338,40 @@ static apr_status_t output_write(h2_task *task, ap_filter_t* f,
task->output.bb = apr_brigade_create(task->pool,
task->c->bucket_alloc);
}
- return ap_save_brigade(f, &task->output.bb, &bb, task->pool);
+ status = ap_save_brigade(f, &task->output.bb, &bb, task->pool);
+ if (status != APR_SUCCESS) {
+ return status;
+ }
}
- if (!task->output.response_open
+ if (!task->output.opened
&& (flush || h2_beam_get_mem_used(task->output.beam) > (32*1024))) {
/* if we have enough buffered or we got a flush bucket, open
* the response now. */
- status = open_response(task,
- h2_from_h1_get_response(task->output.from_h1));
- task->output.response_open = 1;
+ status = open_output(task);
}
-
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, task->c,
+ "h2_task(%s): slave_out leave", task->id);
return status;
}
static apr_status_t output_finish(h2_task *task)
{
- apr_status_t status = APR_SUCCESS;
-
- if (!task->output.response_open) {
- status = open_response(task,
- h2_from_h1_get_response(task->output.from_h1));
- task->output.response_open = 1;
+ if (!task->output.opened) {
+ return open_output(task);
}
- return status;
+ return APR_SUCCESS;
}
/*******************************************************************************
* task slave connection filters
******************************************************************************/
-static apr_status_t h2_filter_stream_input(ap_filter_t* filter,
- apr_bucket_brigade* brigade,
- ap_input_mode_t mode,
- apr_read_type_e block,
- apr_off_t readbytes)
+static apr_status_t h2_filter_slave_input(ap_filter_t* filter,
+ apr_bucket_brigade* brigade,
+ ap_input_mode_t mode,
+ apr_read_type_e block,
+ apr_off_t readbytes)
{
h2_task *task = h2_ctx_cget_task(filter->c);
AP_DEBUG_ASSERT(task);
@@ -527,15 +387,22 @@ static apr_status_t h2_filter_continue(ap_filter_t* f,
h2_task *task = h2_ctx_cget_task(f->c);
apr_status_t status;
- AP_DEBUG_ASSERT(task);
+ ap_assert(task);
if (f->r->expecting_100 && ap_is_HTTP_SUCCESS(f->r->status)) {
- h2_response *response;
-
- response = h2_response_rcreate(task->stream_id, f->r, HTTP_CONTINUE,
- NULL, f->r->pool);
- ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, f->r,
- "h2_task(%s): send 100 Continue", task->id);
- status = open_response(task, response);
+ h2_headers *response;
+ apr_bucket_brigade *tmp;
+ apr_bucket *b;
+
+ response = h2_headers_rcreate(f->r, HTTP_CONTINUE, NULL, f->r->pool);
+ tmp = apr_brigade_create(f->r->pool, f->c->bucket_alloc);
+ b = h2_bucket_headers_create(f->c->bucket_alloc, response);
+ APR_BRIGADE_INSERT_TAIL(tmp, b);
+ b = apr_bucket_flush_create(f->c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(tmp, b);
+ status = ap_pass_brigade(f->r->output_filters, tmp);
+ apr_brigade_destroy(tmp);
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, f->r,
+ "h2_task(%s): sent 100 Continue", task->id);
if (status != APR_SUCCESS) {
return status;
}
@@ -545,53 +412,26 @@ static apr_status_t h2_filter_continue(ap_filter_t* f,
return ap_get_brigade(f->next, brigade, mode, block, readbytes);
}
-static apr_status_t h2_filter_stream_output(ap_filter_t* filter,
- apr_bucket_brigade* brigade)
+static apr_status_t h2_filter_slave_output(ap_filter_t* filter,
+ apr_bucket_brigade* brigade)
{
h2_task *task = h2_ctx_cget_task(filter->c);
apr_status_t status;
ap_assert(task);
- status = output_write(task, filter, brigade);
+ status = slave_out(task, filter, brigade);
if (status != APR_SUCCESS) {
h2_task_rst(task, H2_ERR_INTERNAL_ERROR);
}
return status;
}
-static apr_status_t h2_filter_read_response(ap_filter_t* filter,
- apr_bucket_brigade* bb)
-{
- h2_task *task = h2_ctx_cget_task(filter->c);
- AP_DEBUG_ASSERT(task);
- if (!task->output.from_h1) {
- return APR_ECONNABORTED;
- }
- return h2_from_h1_read_response(task->output.from_h1, filter, bb);
-}
-
/*******************************************************************************
* task things
******************************************************************************/
-apr_status_t h2_task_add_response(h2_task *task, h2_response *response)
-{
- AP_DEBUG_ASSERT(response);
- /* we used to clone the response into out own pool. But
- * we have much tighter control over the EOR bucket nowadays,
- * so just use the instance given */
- response->next = task->response;
- task->response = response;
- if (response->rst_error) {
- h2_task_rst(task, response->rst_error);
- }
- return APR_SUCCESS;
-}
-
-
int h2_task_can_redo(h2_task *task) {
- if (task->response_sent
- || (task->input.beam && h2_beam_was_received(task->input.beam))) {
+ if (task->input.beam && h2_beam_was_received(task->input.beam)) {
/* cannot repeat that. */
return 0;
}
@@ -602,7 +442,6 @@ int h2_task_can_redo(h2_task *task) {
void h2_task_redo(h2_task *task)
{
- task->response = NULL;
task->rst_error = 0;
}
@@ -612,7 +451,7 @@ void h2_task_rst(h2_task *task, int error)
if (task->input.beam) {
h2_beam_abort(task->input.beam);
}
- if (task->output.beam) {
+ if (!task->worker_done && task->output.beam) {
h2_beam_abort(task->output.beam);
}
if (task->c) {
@@ -644,17 +483,18 @@ void h2_task_register_hooks(void)
ap_hook_process_connection(h2_task_process_conn,
NULL, NULL, APR_HOOK_FIRST);
- ap_register_output_filter("H2_RESPONSE", h2_response_output_filter,
- NULL, AP_FTYPE_PROTOCOL);
- ap_register_input_filter("H2_TO_H1", h2_filter_stream_input,
+ ap_register_input_filter("H2_SLAVE_IN", h2_filter_slave_input,
NULL, AP_FTYPE_NETWORK);
+ ap_register_output_filter("H2_SLAVE_OUT", h2_filter_slave_output,
+ NULL, AP_FTYPE_NETWORK);
+
ap_register_input_filter("H2_CONTINUE", h2_filter_continue,
NULL, AP_FTYPE_PROTOCOL);
- ap_register_output_filter("H1_TO_H2", h2_filter_stream_output,
- NULL, AP_FTYPE_NETWORK);
- ap_register_output_filter("H1_TO_H2_RESP", h2_filter_read_response,
+ ap_register_input_filter("H2_REQUEST", h2_filter_request_in,
+ NULL, AP_FTYPE_PROTOCOL);
+ ap_register_output_filter("H2_RESPONSE", h2_headers_output_filter,
NULL, AP_FTYPE_PROTOCOL);
- ap_register_output_filter("H2_TRAILERS", h2_response_trailers_filter,
+ ap_register_output_filter("H2_TRAILERS_OUT", h2_filter_trailers_out,
NULL, AP_FTYPE_PROTOCOL);
}
@@ -680,17 +520,15 @@ static int h2_task_pre_conn(conn_rec* c, void *arg)
if (h2_ctx_is_task(ctx)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
"h2_h2, pre_connection, found stream task");
-
- /* Add our own, network level in- and output filters.
- */
- ap_add_input_filter("H2_TO_H1", NULL, NULL, c);
- ap_add_output_filter("H1_TO_H2", NULL, NULL, c);
+ ap_add_input_filter("H2_SLAVE_IN", NULL, NULL, c);
+ ap_add_output_filter("H2_SLAVE_OUT", NULL, NULL, c);
}
return OK;
}
-h2_task *h2_task_create(conn_rec *c, const h2_request *req,
- h2_bucket_beam *input, h2_mplx *mplx)
+h2_task *h2_task_create(conn_rec *c, apr_uint32_t stream_id,
+ const h2_request *req, h2_bucket_beam *input,
+ h2_mplx *mplx)
{
apr_pool_t *pool;
h2_task *task;
@@ -704,18 +542,16 @@ h2_task *h2_task_create(conn_rec *c, const h2_request *req,
if (task == NULL) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, c,
APLOGNO(02941) "h2_task(%ld-%d): create stream task",
- c->id, req->id);
+ c->id, stream_id);
return NULL;
}
- task->id = apr_psprintf(pool, "%ld-%d", c->id, req->id);
- task->stream_id = req->id;
+ task->id = apr_psprintf(pool, "%ld-%d", c->master->id, stream_id);
+ task->stream_id = stream_id;
task->c = c;
task->mplx = mplx;
task->c->keepalives = mplx->c->keepalives;
task->pool = pool;
task->request = req;
- task->ser_headers = req->serialize;
- task->blocking = 1;
task->input.beam = input;
apr_thread_cond_create(&task->cond, pool);
@@ -738,43 +574,23 @@ void h2_task_destroy(h2_task *task)
}
}
-void h2_task_set_io_blocking(h2_task *task, int blocking)
-{
- task->blocking = blocking;
-}
-
apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread)
{
AP_DEBUG_ASSERT(task);
- task->input.block = APR_BLOCK_READ;
task->input.chunked = task->request->chunked;
- task->input.eos = !task->request->body;
- if (task->input.eos && !task->input.chunked && !task->ser_headers) {
- /* We do not serialize/chunk and have eos already, no need to
- * create a bucket brigade. */
- task->input.bb = NULL;
- task->input.eos_written = 1;
- }
- else {
- task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc);
- if (task->ser_headers) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
- "h2_task(%s): serialize request %s %s",
- task->id, task->request->method, task->request->path);
- apr_brigade_printf(task->input.bb, NULL,
- NULL, "%s %s HTTP/1.1\r\n",
- task->request->method, task->request->path);
- apr_table_do(input_ser_header, task, task->request->headers, NULL);
- apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n");
- }
- if (task->input.eos) {
- input_append_eos(task, NULL);
- }
+ task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc);
+ if (task->request->serialize) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ "h2_task(%s): serialize request %s %s",
+ task->id, task->request->method, task->request->path);
+ apr_brigade_printf(task->input.bb, NULL,
+ NULL, "%s %s HTTP/1.1\r\n",
+ task->request->method, task->request->path);
+ apr_table_do(input_ser_header, task, task->request->headers, NULL);
+ apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n");
}
- task->output.from_h1 = h2_from_h1_create(task->stream_id, task->pool);
-
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
"h2_task(%s): process connection", task->id);
task->c->current_thread = thread;
@@ -811,7 +627,6 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c)
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): start process_request", task->id);
- task->r = r;
ap_process_request(r);
if (task->frozen) {
@@ -819,10 +634,9 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c)
"h2_task(%s): process_request frozen", task->id);
}
else {
- task->r = NULL;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ "h2_task(%s): process_request done", task->id);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
- "h2_task(%s): process_request done", task->id);
/* After the call to ap_process_request, the
* request pool will have been deleted. We set
@@ -858,7 +672,7 @@ static int h2_task_process_conn(conn_rec* c)
ctx = h2_ctx_get(c, 0);
if (h2_ctx_is_task(ctx)) {
- if (!ctx->task->ser_headers) {
+ if (!ctx->task->request->serialize) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_h2, processing request directly");
h2_task_process_request(ctx->task, c);
diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h
index 85b530fa3f..9c81a8693a 100644
--- a/modules/http2/h2_task.h
+++ b/modules/http2/h2_task.h
@@ -44,7 +44,6 @@ struct h2_mplx;
struct h2_task;
struct h2_req_engine;
struct h2_request;
-struct h2_response;
struct h2_worker;
typedef struct h2_task h2_task;
@@ -56,36 +55,29 @@ struct h2_task {
apr_pool_t *pool;
const struct h2_request *request;
- struct h2_response *response;
+ int rst_error; /* h2 related stream abort error */
struct {
struct h2_bucket_beam *beam;
- apr_bucket_brigade *bb;
- apr_read_type_e block;
unsigned int chunked : 1;
unsigned int eos : 1;
- unsigned int eos_written : 1;
+ apr_bucket_brigade *bb;
+ apr_bucket_brigade *bbchunk;
} input;
struct {
struct h2_bucket_beam *beam;
- struct h2_from_h1 *from_h1;
unsigned int opened : 1;
- unsigned int response_open : 1;
+ unsigned int sent_response : 1;
unsigned int copy_files : 1;
- apr_off_t written;
apr_bucket_brigade *bb;
} output;
struct h2_mplx *mplx;
struct apr_thread_cond_t *cond;
- int rst_error; /* h2 related stream abort error */
unsigned int filters_set : 1;
- unsigned int ser_headers : 1;
unsigned int frozen : 1;
- unsigned int blocking : 1;
unsigned int detached : 1;
- unsigned int response_sent : 1; /* a response has been sent to client */
unsigned int worker_started : 1; /* h2_worker started processing for this io */
unsigned int worker_done : 1; /* h2_worker finished for this io */
@@ -95,18 +87,16 @@ struct h2_task {
struct h2_req_engine *engine; /* engine hosted by this task */
struct h2_req_engine *assigned; /* engine that task has been assigned to */
- request_rec *r; /* request being processed in this task */
};
-h2_task *h2_task_create(conn_rec *c, const struct h2_request *req,
+h2_task *h2_task_create(conn_rec *c, apr_uint32_t stream_id,
+ const struct h2_request *req,
struct h2_bucket_beam *input, struct h2_mplx *mplx);
void h2_task_destroy(h2_task *task);
apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread);
-apr_status_t h2_task_add_response(h2_task *task, struct h2_response *response);
-
void h2_task_redo(h2_task *task);
int h2_task_can_redo(h2_task *task);
@@ -128,6 +118,4 @@ apr_status_t h2_task_freeze(h2_task *task);
apr_status_t h2_task_thaw(h2_task *task);
int h2_task_is_detached(h2_task *task);
-void h2_task_set_io_blocking(h2_task *task, int blocking);
-
#endif /* defined(__mod_h2__h2_task__) */
diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c
index 16c90a639a..f62bc6c8b7 100644
--- a/modules/http2/h2_util.c
+++ b/modules/http2/h2_util.c
@@ -903,11 +903,11 @@ apr_size_t h2_util_bucket_print(char *buffer, apr_size_t bmax,
off += apr_snprintf(buffer+off, bmax-off, "eor");
}
else {
- off += apr_snprintf(buffer+off, bmax-off, "meta(unknown)");
+ off += apr_snprintf(buffer+off, bmax-off, "%s", b->type->name);
}
}
else {
- const char *btype = "data";
+ const char *btype = b->type->name;
if (APR_BUCKET_IS_FILE(b)) {
btype = "file";
}
@@ -972,7 +972,8 @@ apr_size_t h2_util_bb_print(char *buffer, apr_size_t bmax,
apr_status_t h2_append_brigade(apr_bucket_brigade *to,
apr_bucket_brigade *from,
apr_off_t *plen,
- int *peos)
+ int *peos,
+ h2_bucket_gate *should_append)
{
apr_bucket *e;
apr_off_t len = 0, remain = *plen;
@@ -983,7 +984,10 @@ apr_status_t h2_append_brigade(apr_bucket_brigade *to,
while (!APR_BRIGADE_EMPTY(from)) {
e = APR_BRIGADE_FIRST(from);
- if (APR_BUCKET_IS_METADATA(e)) {
+ if (!should_append(e)) {
+ goto leave;
+ }
+ else if (APR_BUCKET_IS_METADATA(e)) {
if (APR_BUCKET_IS_EOS(e)) {
*peos = 1;
apr_bucket_delete(e);
@@ -1002,7 +1006,7 @@ apr_status_t h2_append_brigade(apr_bucket_brigade *to,
if (remain < e->length) {
if (remain <= 0) {
- return APR_SUCCESS;
+ goto leave;
}
apr_bucket_split(e, (apr_size_t)remain);
}
@@ -1013,7 +1017,7 @@ apr_status_t h2_append_brigade(apr_bucket_brigade *to,
len += e->length;
remain -= e->length;
}
-
+leave:
*plen = len;
return APR_SUCCESS;
}
@@ -1282,7 +1286,6 @@ h2_request *h2_req_create(int id, apr_pool_t *pool, const char *method,
{
h2_request *req = apr_pcalloc(pool, sizeof(h2_request));
- req->id = id;
req->method = method;
req->scheme = scheme;
req->authority = authority;
@@ -1380,11 +1383,11 @@ int h2_util_frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen)
/*******************************************************************************
* push policy
******************************************************************************/
-void h2_push_policy_determine(struct h2_request *req, apr_pool_t *p, int push_enabled)
+int h2_push_policy_determine(apr_table_t *headers, apr_pool_t *p, int push_enabled)
{
h2_push_policy policy = H2_PUSH_NONE;
if (push_enabled) {
- const char *val = apr_table_get(req->headers, "accept-push-policy");
+ const char *val = apr_table_get(headers, "accept-push-policy");
if (val) {
if (ap_find_token(p, val, "fast-load")) {
policy = H2_PUSH_FAST_LOAD;
@@ -1407,6 +1410,6 @@ void h2_push_policy_determine(struct h2_request *req, apr_pool_t *p, int push_en
policy = H2_PUSH_DEFAULT;
}
}
- req->push_policy = policy;
+ return policy;
}
diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h
index 8d2c195840..024c31381e 100644
--- a/modules/http2/h2_util.h
+++ b/modules/http2/h2_util.h
@@ -198,11 +198,12 @@ int h2_proxy_res_ignore_header(const char *name, size_t len);
* account, see draft https://tools.ietf.org/html/draft-ruellan-http-accept-push-policy-00
* for details.
*
- * @param req the request to determine the policy for
+ * @param headers the http headers to inspect
* @param p the pool to use
* @param push_enabled if HTTP/2 server push is generally enabled for this request
+ * @return the push policy desired
*/
-void h2_push_policy_determine(struct h2_request *req, apr_pool_t *p, int push_enabled);
+int h2_push_policy_determine(apr_table_t *headers, apr_pool_t *p, int push_enabled);
/*******************************************************************************
* base64 url encoding, different table from normal base64
@@ -352,11 +353,12 @@ do { \
const char *line = "(null)"; \
apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \
len = h2_util_bb_print(buffer, bmax, (tag), "", (bb)); \
- ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%ld-%d): %s", \
- (c)->id, (int)(sid), (len? buffer : line)); \
+ ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%s): %s", \
+ (c)->log_id, (len? buffer : line)); \
} while(0)
+typedef int h2_bucket_gate(apr_bucket *b);
/**
* Transfer buckets from one brigade to another with a limit on the
* maximum amount of bytes transferred. Does no setaside magic, lifetime
@@ -369,7 +371,8 @@ do { \
apr_status_t h2_append_brigade(apr_bucket_brigade *to,
apr_bucket_brigade *from,
apr_off_t *plen,
- int *peos);
+ int *peos,
+ h2_bucket_gate *should_append);
/**
* Get an approximnation of the memory footprint of the given
diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h
index d34ee06e9f..37d8e27a79 100644
--- a/modules/http2/h2_version.h
+++ b/modules/http2/h2_version.h
@@ -26,7 +26,7 @@
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.6.3-DEV"
+#define MOD_HTTP2_VERSION "1.7.0-DEV"
/**
* @macro
@@ -34,7 +34,7 @@
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010603
+#define MOD_HTTP2_VERSION_NUM 0x010700
#endif /* mod_h2_h2_version_h */
diff --git a/modules/http2/mod_http2.c b/modules/http2/mod_http2.c
index 65ea3d6aa4..8cf6e2de69 100644
--- a/modules/http2/mod_http2.c
+++ b/modules/http2/mod_http2.c
@@ -233,8 +233,11 @@ static const char *val_H2_PUSH(apr_pool_t *p, server_rec *s,
if (ctx) {
if (r) {
h2_task *task = h2_ctx_get_task(ctx);
- if (task && task->request->push_policy != H2_PUSH_NONE) {
- return "on";
+ if (task) {
+ h2_stream *stream = h2_mplx_stream_get(task->mplx, task->stream_id);
+ if (stream && stream->push_policy != H2_PUSH_NONE) {
+ return "on";
+ }
}
}
else if (c && h2_session_push_enabled(ctx->session)) {
@@ -268,7 +271,10 @@ static const char *val_H2_PUSHED_ON(apr_pool_t *p, server_rec *s,
if (ctx) {
h2_task *task = h2_ctx_get_task(ctx);
if (task && !H2_STREAM_CLIENT_INITIATED(task->stream_id)) {
- return apr_itoa(p, task->request->initiated_on);
+ h2_stream *stream = h2_mplx_stream_get(task->mplx, task->stream_id);
+ if (stream) {
+ return apr_itoa(p, stream->initiated_on);
+ }
}
}
return "";
diff --git a/modules/http2/mod_http2.dsp b/modules/http2/mod_http2.dsp
index 949414872c..f26607470d 100644
--- a/modules/http2/mod_http2.dsp
+++ b/modules/http2/mod_http2.dsp
@@ -161,7 +161,7 @@ SOURCE=./h2_request.c
# End Source File
# Begin Source File
-SOURCE=./h2_response.c
+SOURCE=./h2_headers.c
# End Source File
# Begin Source File