diff options
Diffstat (limited to 'buckets/aggregate_buckets.c')
-rw-r--r-- | buckets/aggregate_buckets.c | 400 |
1 files changed, 400 insertions, 0 deletions
diff --git a/buckets/aggregate_buckets.c b/buckets/aggregate_buckets.c new file mode 100644 index 0000000..d9d15a3 --- /dev/null +++ b/buckets/aggregate_buckets.c @@ -0,0 +1,400 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "serf.h" +#include "serf_bucket_util.h" + + +/* Should be an APR_RING? */ +typedef struct bucket_list { + serf_bucket_t *bucket; + struct bucket_list *next; +} bucket_list_t; + +typedef struct { + bucket_list_t *list; /* active buckets */ + bucket_list_t *last; /* last bucket of the list */ + bucket_list_t *done; /* we finished reading this; now pending a destroy */ + + serf_bucket_aggregate_eof_t hold_open; + void *hold_open_baton; + + /* Does this bucket own its children? !0 if yes, 0 if not. */ + int bucket_owner; +} aggregate_context_t; + + +static void cleanup_aggregate(aggregate_context_t *ctx, + serf_bucket_alloc_t *allocator) +{ + bucket_list_t *next_list; + + /* If we finished reading a bucket during the previous read, then + * we can now toss that bucket. + */ + while (ctx->done != NULL) { + next_list = ctx->done->next; + + if (ctx->bucket_owner) { + serf_bucket_destroy(ctx->done->bucket); + } + serf_bucket_mem_free(allocator, ctx->done); + + ctx->done = next_list; + } +} + +void serf_bucket_aggregate_cleanup( + serf_bucket_t *bucket, serf_bucket_alloc_t *allocator) +{ + aggregate_context_t *ctx = bucket->data; + + cleanup_aggregate(ctx, allocator); +} + +static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator) +{ + aggregate_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + + ctx->list = NULL; + ctx->last = NULL; + ctx->done = NULL; + ctx->hold_open = NULL; + ctx->hold_open_baton = NULL; + ctx->bucket_owner = 1; + + return ctx; +} + +serf_bucket_t *serf_bucket_aggregate_create( + serf_bucket_alloc_t *allocator) +{ + aggregate_context_t *ctx; + + ctx = create_aggregate(allocator); + + return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx); +} + +serf_bucket_t *serf__bucket_stream_create( + serf_bucket_alloc_t *allocator, + serf_bucket_aggregate_eof_t fn, + void *baton) +{ + serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator); + aggregate_context_t *ctx = bucket->data; + + serf_bucket_aggregate_hold_open(bucket, fn, baton); + + ctx->bucket_owner = 0; + + return bucket; +} + + +static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket) +{ + aggregate_context_t *ctx = bucket->data; + bucket_list_t *next_ctx; + + while (ctx->list) { + if (ctx->bucket_owner) { + serf_bucket_destroy(ctx->list->bucket); + } + next_ctx = ctx->list->next; + serf_bucket_mem_free(bucket->allocator, ctx->list); + ctx->list = next_ctx; + } + cleanup_aggregate(ctx, bucket->allocator); + + serf_default_destroy_and_data(bucket); +} + +void serf_bucket_aggregate_become(serf_bucket_t *bucket) +{ + aggregate_context_t *ctx; + + ctx = create_aggregate(bucket->allocator); + + bucket->type = &serf_bucket_type_aggregate; + bucket->data = ctx; + + /* The allocator remains the same. */ +} + + +void serf_bucket_aggregate_prepend( + serf_bucket_t *aggregate_bucket, + serf_bucket_t *prepend_bucket) +{ + aggregate_context_t *ctx = aggregate_bucket->data; + bucket_list_t *new_list; + + new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator, + sizeof(*new_list)); + new_list->bucket = prepend_bucket; + new_list->next = ctx->list; + + ctx->list = new_list; +} + +void serf_bucket_aggregate_append( + serf_bucket_t *aggregate_bucket, + serf_bucket_t *append_bucket) +{ + aggregate_context_t *ctx = aggregate_bucket->data; + bucket_list_t *new_list; + + new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator, + sizeof(*new_list)); + new_list->bucket = append_bucket; + new_list->next = NULL; + + /* If we use APR_RING, this is trivial. So, wait. + new_list->next = ctx->list; + ctx->list = new_list; + */ + if (ctx->list == NULL) { + ctx->list = new_list; + ctx->last = new_list; + } + else { + ctx->last->next = new_list; + ctx->last = ctx->last->next; + } +} + +void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket, + serf_bucket_aggregate_eof_t fn, + void *baton) +{ + aggregate_context_t *ctx = aggregate_bucket->data; + ctx->hold_open = fn; + ctx->hold_open_baton = baton; +} + +void serf_bucket_aggregate_prepend_iovec( + serf_bucket_t *aggregate_bucket, + struct iovec *vecs, + int vecs_count) +{ + int i; + + /* Add in reverse order. */ + for (i = vecs_count - 1; i >= 0; i--) { + serf_bucket_t *new_bucket; + + new_bucket = serf_bucket_simple_create(vecs[i].iov_base, + vecs[i].iov_len, + NULL, NULL, + aggregate_bucket->allocator); + + serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket); + + } +} + +void serf_bucket_aggregate_append_iovec( + serf_bucket_t *aggregate_bucket, + struct iovec *vecs, + int vecs_count) +{ + serf_bucket_t *new_bucket; + + new_bucket = serf_bucket_iovec_create(vecs, vecs_count, + aggregate_bucket->allocator); + + serf_bucket_aggregate_append(aggregate_bucket, new_bucket); +} + +static apr_status_t read_aggregate(serf_bucket_t *bucket, + apr_size_t requested, + int vecs_size, struct iovec *vecs, + int *vecs_used) +{ + aggregate_context_t *ctx = bucket->data; + int cur_vecs_used; + apr_status_t status; + + *vecs_used = 0; + + if (!ctx->list) { + if (ctx->hold_open) { + return ctx->hold_open(ctx->hold_open_baton, bucket); + } + else { + return APR_EOF; + } + } + + status = APR_SUCCESS; + while (requested) { + serf_bucket_t *head = ctx->list->bucket; + + status = serf_bucket_read_iovec(head, requested, vecs_size, vecs, + &cur_vecs_used); + + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + /* Add the number of vecs we read to our running total. */ + *vecs_used += cur_vecs_used; + + if (cur_vecs_used > 0 || status) { + bucket_list_t *next_list; + + /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now + * as it isn't safe to read more without returning to our caller. + */ + if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) { + return status; + } + + /* However, if we read EOF, we can stash this bucket in a + * to-be-freed list and move on to the next bucket. This ensures + * that the bucket stays alive (so as not to violate our read + * semantics). We'll destroy this list of buckets the next time + * we are asked to perform a read operation - thus ensuring the + * proper read lifetime. + */ + next_list = ctx->list->next; + ctx->list->next = ctx->done; + ctx->done = ctx->list; + ctx->list = next_list; + + /* If we have no more in our list, return EOF. */ + if (!ctx->list) { + if (ctx->hold_open) { + return ctx->hold_open(ctx->hold_open_baton, bucket); + } + else { + return APR_EOF; + } + } + + /* At this point, it safe to read the next bucket - if we can. */ + + /* If the caller doesn't want ALL_AVAIL, decrement the size + * of the items we just read from the list. + */ + if (requested != SERF_READ_ALL_AVAIL) { + int i; + + for (i = 0; i < cur_vecs_used; i++) + requested -= vecs[i].iov_len; + } + + /* Adjust our vecs to account for what we just read. */ + vecs_size -= cur_vecs_used; + vecs += cur_vecs_used; + + /* We reached our max. Oh well. */ + if (!requested || !vecs_size) { + return APR_SUCCESS; + } + } + } + + return status; +} + +static apr_status_t serf_aggregate_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + aggregate_context_t *ctx = bucket->data; + struct iovec vec; + int vecs_used; + apr_status_t status; + + cleanup_aggregate(ctx, bucket->allocator); + + status = read_aggregate(bucket, requested, 1, &vec, &vecs_used); + + if (!vecs_used) { + *len = 0; + } + else { + *data = vec.iov_base; + *len = vec.iov_len; + } + + return status; +} + +static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket, + apr_size_t requested, + int vecs_size, + struct iovec *vecs, + int *vecs_used) +{ + aggregate_context_t *ctx = bucket->data; + + cleanup_aggregate(ctx, bucket->allocator); + + return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used); +} + +static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + /* Follow pattern from serf_aggregate_read. */ + return APR_ENOTIMPL; +} + +static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + /* Follow pattern from serf_aggregate_read. */ + return APR_ENOTIMPL; +} + +static serf_bucket_t * serf_aggregate_read_bucket( + serf_bucket_t *bucket, + const serf_bucket_type_t *type) +{ + aggregate_context_t *ctx = bucket->data; + serf_bucket_t *found_bucket; + + if (!ctx->list) { + return NULL; + } + + if (ctx->list->bucket->type == type) { + /* Got the bucket. Consume it from our list. */ + found_bucket = ctx->list->bucket; + ctx->list = ctx->list->next; + return found_bucket; + } + + /* Call read_bucket on first one in our list. */ + return serf_bucket_read_bucket(ctx->list->bucket, type); +} + + +const serf_bucket_type_t serf_bucket_type_aggregate = { + "AGGREGATE", + serf_aggregate_read, + serf_aggregate_readline, + serf_aggregate_read_iovec, + serf_default_read_for_sendfile, + serf_aggregate_read_bucket, + serf_aggregate_peek, + serf_aggregate_destroy_and_data, +}; |