diff options
author | Graham Leggett <minfrin@apache.org> | 2015-10-04 10:10:51 +0000 |
---|---|---|
committer | Graham Leggett <minfrin@apache.org> | 2015-10-04 10:10:51 +0000 |
commit | 615f97f93364fd7189ce973478266ce3d229d76b (patch) | |
tree | 84b9f0601b3a3ba6ecb0caf794378d7019f850e5 /server/util_filter.c | |
parent | cd6aa9bc3bc2391839c64c8778ed5d7ec0d220d7 (diff) | |
download | httpd-615f97f93364fd7189ce973478266ce3d229d76b.tar.gz |
core: Extend support for asynchronous write completion from the
network filter to any connection or request filter.
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1706669 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'server/util_filter.c')
-rw-r--r-- | server/util_filter.c | 263 |
1 files changed, 262 insertions, 1 deletions
diff --git a/server/util_filter.c b/server/util_filter.c index 01eb533520..ad14112ddb 100644 --- a/server/util_filter.c +++ b/server/util_filter.c @@ -24,6 +24,7 @@ #include "http_config.h" #include "http_core.h" #include "http_log.h" +#include "http_request.h" #include "util_filter.h" /* NOTE: Apache's current design doesn't allow a pool to be passed thru, @@ -32,6 +33,10 @@ #define FILTER_POOL apr_hook_global_pool #include "ap_hooks.h" /* for apr_hook_global_pool */ +/* XXX: Should these be configurable parameters? */ +#define THRESHOLD_MAX_BUFFER 65536 +#define MAX_REQUESTS_IN_PIPELINE 5 + /* ** This macro returns true/false if a given filter should be inserted BEFORE ** another filter. This will happen when one of: 1) there isn't another @@ -319,6 +324,8 @@ static ap_filter_t *add_any_filter_handle(ap_filter_rec_t *frec, void *ctx, f->r = frec->ftype < AP_FTYPE_CONNECTION ? r : NULL; f->c = c; f->next = NULL; + f->bb = NULL; + f->deferred_pool = NULL; if (INSERT_BEFORE(f, *outf)) { f->next = *outf; @@ -474,6 +481,16 @@ AP_DECLARE(void) ap_remove_input_filter(ap_filter_t *f) AP_DECLARE(void) ap_remove_output_filter(ap_filter_t *f) { + + if ((f->bb) && !APR_BRIGADE_EMPTY(f->bb)) { + apr_brigade_cleanup(f->bb); + } + + if (f->deferred_pool) { + apr_pool_destroy(f->deferred_pool); + f->deferred_pool = NULL; + } + remove_any_filter(f, f->r ? &f->r->output_filters : NULL, f->r ? &f->r->proto_output_filters : NULL, &f->c->output_filters); @@ -566,6 +583,7 @@ AP_DECLARE(apr_status_t) ap_pass_brigade(ap_filter_t *next, { if (next) { apr_bucket *e; + if ((e = APR_BRIGADE_LAST(bb)) && APR_BUCKET_IS_EOS(e) && next->r) { /* This is only safe because HTTP_HEADER filter is always in * the filter stack. This ensures that there is ALWAYS a @@ -635,7 +653,8 @@ AP_DECLARE(apr_status_t) ap_save_brigade(ap_filter_t *f, apr_status_t rv, srv = APR_SUCCESS; /* If have never stored any data in the filter, then we had better - * create an empty bucket brigade so that we can concat. + * create an empty bucket brigade so that we can concat. Register + * a cleanup to zero out the pointer if the pool is cleared. */ if (!(*saveto)) { *saveto = apr_brigade_create(p, f->c->bucket_alloc); @@ -673,6 +692,248 @@ AP_DECLARE(apr_status_t) ap_save_brigade(ap_filter_t *f, return srv; } +static apr_status_t filters_cleanup(void *data) +{ + ap_filter_t **key = data; + + apr_hash_set((*key)->c->filters, key, sizeof(ap_filter_t **), NULL); + + return APR_SUCCESS; +} + +AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f, + apr_bucket_brigade *bb) +{ + int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX); + + if (loglevel >= APLOG_TRACE6) { + ap_log_cerror( + APLOG_MARK, APLOG_TRACE6, 0, f->c, + "setaside %s brigade to %s brigade in '%s' output filter", + (APR_BRIGADE_EMPTY(bb) ? "empty" : "full"), + (!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"), f->frec->name); + } + + if (!APR_BRIGADE_EMPTY(bb)) { + apr_pool_t *pool; + /* + * Set aside the brigade bb within f->bb. + */ + if (!f->bb) { + ap_filter_t **key; + + pool = f->r ? f->r->pool : f->c->pool; + + key = apr_palloc(pool, sizeof(ap_filter_t **)); + *key = f; + apr_hash_set(f->c->filters, key, sizeof(ap_filter_t **), f); + + f->bb = apr_brigade_create(pool, f->c->bucket_alloc); + + apr_pool_pre_cleanup_register(pool, key, filters_cleanup); + + } + + /* decide what pool we setaside to, request pool or deferred pool? */ + if (f->r) { + pool = f->r->pool; + APR_BRIGADE_CONCAT(f->bb, bb); + } + else { + if (!f->deferred_pool) { + apr_pool_create(&f->deferred_pool, f->c->pool); + apr_pool_tag(f->deferred_pool, "deferred_pool"); + } + pool = f->deferred_pool; + return ap_save_brigade(f, &f->bb, &bb, pool); + } + + } + else if (f->deferred_pool) { + /* + * There are no more requests in the pipeline. We can just clear the + * pool. + */ + apr_brigade_cleanup(f->bb); + apr_pool_clear(f->deferred_pool); + } + return APR_SUCCESS; +} + +AP_DECLARE(apr_status_t) ap_filter_reinstate_brigade(ap_filter_t *f, + apr_bucket_brigade *bb, + apr_bucket **flush_upto) +{ + apr_bucket *bucket, *next; + apr_size_t bytes_in_brigade, non_file_bytes_in_brigade; + int eor_buckets_in_brigade, morphing_bucket_in_brigade; + int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX); + + if (loglevel >= APLOG_TRACE6) { + ap_log_cerror( + APLOG_MARK, APLOG_TRACE6, 0, f->c, + "reinstate %s brigade to %s brigade in '%s' output filter", + (!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"), + (APR_BRIGADE_EMPTY(bb) ? "empty" : "full"), f->frec->name); + } + + if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) { + APR_BRIGADE_PREPEND(bb, f->bb); + } + + /* + * Determine if and up to which bucket we need to do a blocking write: + * + * a) The brigade contains a flush bucket: Do a blocking write + * of everything up that point. + * + * b) The request is in CONN_STATE_HANDLER state, and the brigade + * contains at least THRESHOLD_MAX_BUFFER bytes in non-file + * buckets: Do blocking writes until the amount of data in the + * buffer is less than THRESHOLD_MAX_BUFFER. (The point of this + * rule is to provide flow control, in case a handler is + * streaming out lots of data faster than the data can be + * sent to the client.) + * + * c) The request is in CONN_STATE_HANDLER state, and the brigade + * contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets: + * Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR + * buckets are left. (The point of this rule is to prevent too many + * FDs being kept open by pipelined requests, possibly allowing a + * DoS). + * + * d) The request is being served by a connection filter and the + * brigade contains a morphing bucket: If there was no other + * reason to do a blocking write yet, try reading the bucket. If its + * contents fit into memory before THRESHOLD_MAX_BUFFER is reached, + * everything is fine. Otherwise we need to do a blocking write the + * up to and including the morphing bucket, because ap_save_brigade() + * would read the whole bucket into memory later on. + */ + + *flush_upto = NULL; + + bytes_in_brigade = 0; + non_file_bytes_in_brigade = 0; + eor_buckets_in_brigade = 0; + morphing_bucket_in_brigade = 0; + + for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb); + bucket = next) { + next = APR_BUCKET_NEXT(bucket); + + if (!APR_BUCKET_IS_METADATA(bucket)) { + if (bucket->length == (apr_size_t)-1) { + /* + * A setaside of morphing buckets would read everything into + * memory. Instead, we will flush everything up to and + * including this bucket. + */ + morphing_bucket_in_brigade = 1; + } + else { + bytes_in_brigade += bucket->length; + if (!APR_BUCKET_IS_FILE(bucket)) + non_file_bytes_in_brigade += bucket->length; + } + } + else if (AP_BUCKET_IS_EOR(bucket)) { + eor_buckets_in_brigade++; + } + + if (APR_BUCKET_IS_FLUSH(bucket) + || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER + || (!f->r && morphing_bucket_in_brigade) + || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) { + /* this segment of the brigade MUST be sent before returning. */ + + if (loglevel >= APLOG_TRACE6) { + char *reason = APR_BUCKET_IS_FLUSH(bucket) ? + "FLUSH bucket" : + (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ? + "THRESHOLD_MAX_BUFFER" : + (!f->r && morphing_bucket_in_brigade) ? "morphing bucket" : + "MAX_REQUESTS_IN_PIPELINE"; + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c, + "will flush because of %s", reason); + ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c, + "seen in brigade%s: bytes: %" APR_SIZE_T_FMT + ", non-file bytes: %" APR_SIZE_T_FMT ", eor " + "buckets: %d, morphing buckets: %d", + flush_upto == NULL ? " so far" + : " since last flush point", + bytes_in_brigade, + non_file_bytes_in_brigade, + eor_buckets_in_brigade, + morphing_bucket_in_brigade); + } + /* + * Defer the actual blocking write to avoid doing many writes. + */ + *flush_upto = next; + + bytes_in_brigade = 0; + non_file_bytes_in_brigade = 0; + eor_buckets_in_brigade = 0; + morphing_bucket_in_brigade = 0; + } + } + + if (loglevel >= APLOG_TRACE8) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c, + "brigade contains: bytes: %" APR_SIZE_T_FMT + ", non-file bytes: %" APR_SIZE_T_FMT + ", eor buckets: %d, morphing buckets: %d", + bytes_in_brigade, non_file_bytes_in_brigade, + eor_buckets_in_brigade, morphing_bucket_in_brigade); + } + + return APR_SUCCESS; +} + +AP_DECLARE(int) ap_filter_should_yield(ap_filter_t *f) +{ + /* + * This function decides whether a filter should yield due to buffered + * data in a downstream filter. If a downstream filter buffers we + * must back off so we don't overwhelm the server. If this function + * returns true, the filter should call ap_filter_setaside_brigade() + * to save unprocessed buckets, and then reinstate those buckets on + * the next call with ap_filter_reinstate_brigade() and continue + * where it left off. + * + * If this function is forced to return zero, we return back to + * synchronous filter behaviour. + * + * Subrequests present us with a problem - we don't know how much data + * they will produce and therefore how much buffering we'll need, and + * if a subrequest had to trigger buffering, but next subrequest wouldn't + * know when the previous one had finished sending data and buckets + * could be sent out of order. + * + * In the case of subrequests, deny the ability to yield. When the data + * reaches the filters from the main request, they will be setaside + * there in the right order and the request will be given the + * opportunity to yield. + */ + if (f->r && f->r->main) { + return 0; + } + + /* + * This is either a main request or internal redirect, or it is a + * connection filter. Yield if there is any buffered data downstream + * from us. + */ + while (f) { + if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) { + return 1; + } + f = f->next; + } + return 0; +} + AP_DECLARE_NONSTD(apr_status_t) ap_filter_flush(apr_bucket_brigade *bb, void *ctx) { |