summaryrefslogtreecommitdiff
path: root/buckets/aggregate_buckets.c
diff options
context:
space:
mode:
Diffstat (limited to 'buckets/aggregate_buckets.c')
-rw-r--r--buckets/aggregate_buckets.c400
1 files changed, 400 insertions, 0 deletions
diff --git a/buckets/aggregate_buckets.c b/buckets/aggregate_buckets.c
new file mode 100644
index 0000000..d9d15a3
--- /dev/null
+++ b/buckets/aggregate_buckets.c
@@ -0,0 +1,400 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+
+/* Should be an APR_RING? */
+typedef struct bucket_list {
+ serf_bucket_t *bucket;
+ struct bucket_list *next;
+} bucket_list_t;
+
+typedef struct {
+ bucket_list_t *list; /* active buckets */
+ bucket_list_t *last; /* last bucket of the list */
+ bucket_list_t *done; /* we finished reading this; now pending a destroy */
+
+ serf_bucket_aggregate_eof_t hold_open;
+ void *hold_open_baton;
+
+ /* Does this bucket own its children? !0 if yes, 0 if not. */
+ int bucket_owner;
+} aggregate_context_t;
+
+
+static void cleanup_aggregate(aggregate_context_t *ctx,
+ serf_bucket_alloc_t *allocator)
+{
+ bucket_list_t *next_list;
+
+ /* If we finished reading a bucket during the previous read, then
+ * we can now toss that bucket.
+ */
+ while (ctx->done != NULL) {
+ next_list = ctx->done->next;
+
+ if (ctx->bucket_owner) {
+ serf_bucket_destroy(ctx->done->bucket);
+ }
+ serf_bucket_mem_free(allocator, ctx->done);
+
+ ctx->done = next_list;
+ }
+}
+
+void serf_bucket_aggregate_cleanup(
+ serf_bucket_t *bucket, serf_bucket_alloc_t *allocator)
+{
+ aggregate_context_t *ctx = bucket->data;
+
+ cleanup_aggregate(ctx, allocator);
+}
+
+static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator)
+{
+ aggregate_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+
+ ctx->list = NULL;
+ ctx->last = NULL;
+ ctx->done = NULL;
+ ctx->hold_open = NULL;
+ ctx->hold_open_baton = NULL;
+ ctx->bucket_owner = 1;
+
+ return ctx;
+}
+
+serf_bucket_t *serf_bucket_aggregate_create(
+ serf_bucket_alloc_t *allocator)
+{
+ aggregate_context_t *ctx;
+
+ ctx = create_aggregate(allocator);
+
+ return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
+}
+
+serf_bucket_t *serf__bucket_stream_create(
+ serf_bucket_alloc_t *allocator,
+ serf_bucket_aggregate_eof_t fn,
+ void *baton)
+{
+ serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator);
+ aggregate_context_t *ctx = bucket->data;
+
+ serf_bucket_aggregate_hold_open(bucket, fn, baton);
+
+ ctx->bucket_owner = 0;
+
+ return bucket;
+}
+
+
+static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
+{
+ aggregate_context_t *ctx = bucket->data;
+ bucket_list_t *next_ctx;
+
+ while (ctx->list) {
+ if (ctx->bucket_owner) {
+ serf_bucket_destroy(ctx->list->bucket);
+ }
+ next_ctx = ctx->list->next;
+ serf_bucket_mem_free(bucket->allocator, ctx->list);
+ ctx->list = next_ctx;
+ }
+ cleanup_aggregate(ctx, bucket->allocator);
+
+ serf_default_destroy_and_data(bucket);
+}
+
+void serf_bucket_aggregate_become(serf_bucket_t *bucket)
+{
+ aggregate_context_t *ctx;
+
+ ctx = create_aggregate(bucket->allocator);
+
+ bucket->type = &serf_bucket_type_aggregate;
+ bucket->data = ctx;
+
+ /* The allocator remains the same. */
+}
+
+
+void serf_bucket_aggregate_prepend(
+ serf_bucket_t *aggregate_bucket,
+ serf_bucket_t *prepend_bucket)
+{
+ aggregate_context_t *ctx = aggregate_bucket->data;
+ bucket_list_t *new_list;
+
+ new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
+ sizeof(*new_list));
+ new_list->bucket = prepend_bucket;
+ new_list->next = ctx->list;
+
+ ctx->list = new_list;
+}
+
+void serf_bucket_aggregate_append(
+ serf_bucket_t *aggregate_bucket,
+ serf_bucket_t *append_bucket)
+{
+ aggregate_context_t *ctx = aggregate_bucket->data;
+ bucket_list_t *new_list;
+
+ new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
+ sizeof(*new_list));
+ new_list->bucket = append_bucket;
+ new_list->next = NULL;
+
+ /* If we use APR_RING, this is trivial. So, wait.
+ new_list->next = ctx->list;
+ ctx->list = new_list;
+ */
+ if (ctx->list == NULL) {
+ ctx->list = new_list;
+ ctx->last = new_list;
+ }
+ else {
+ ctx->last->next = new_list;
+ ctx->last = ctx->last->next;
+ }
+}
+
+void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket,
+ serf_bucket_aggregate_eof_t fn,
+ void *baton)
+{
+ aggregate_context_t *ctx = aggregate_bucket->data;
+ ctx->hold_open = fn;
+ ctx->hold_open_baton = baton;
+}
+
+void serf_bucket_aggregate_prepend_iovec(
+ serf_bucket_t *aggregate_bucket,
+ struct iovec *vecs,
+ int vecs_count)
+{
+ int i;
+
+ /* Add in reverse order. */
+ for (i = vecs_count - 1; i >= 0; i--) {
+ serf_bucket_t *new_bucket;
+
+ new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
+ vecs[i].iov_len,
+ NULL, NULL,
+ aggregate_bucket->allocator);
+
+ serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
+
+ }
+}
+
+void serf_bucket_aggregate_append_iovec(
+ serf_bucket_t *aggregate_bucket,
+ struct iovec *vecs,
+ int vecs_count)
+{
+ serf_bucket_t *new_bucket;
+
+ new_bucket = serf_bucket_iovec_create(vecs, vecs_count,
+ aggregate_bucket->allocator);
+
+ serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
+}
+
+static apr_status_t read_aggregate(serf_bucket_t *bucket,
+ apr_size_t requested,
+ int vecs_size, struct iovec *vecs,
+ int *vecs_used)
+{
+ aggregate_context_t *ctx = bucket->data;
+ int cur_vecs_used;
+ apr_status_t status;
+
+ *vecs_used = 0;
+
+ if (!ctx->list) {
+ if (ctx->hold_open) {
+ return ctx->hold_open(ctx->hold_open_baton, bucket);
+ }
+ else {
+ return APR_EOF;
+ }
+ }
+
+ status = APR_SUCCESS;
+ while (requested) {
+ serf_bucket_t *head = ctx->list->bucket;
+
+ status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
+ &cur_vecs_used);
+
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+
+ /* Add the number of vecs we read to our running total. */
+ *vecs_used += cur_vecs_used;
+
+ if (cur_vecs_used > 0 || status) {
+ bucket_list_t *next_list;
+
+ /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
+ * as it isn't safe to read more without returning to our caller.
+ */
+ if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) {
+ return status;
+ }
+
+ /* However, if we read EOF, we can stash this bucket in a
+ * to-be-freed list and move on to the next bucket. This ensures
+ * that the bucket stays alive (so as not to violate our read
+ * semantics). We'll destroy this list of buckets the next time
+ * we are asked to perform a read operation - thus ensuring the
+ * proper read lifetime.
+ */
+ next_list = ctx->list->next;
+ ctx->list->next = ctx->done;
+ ctx->done = ctx->list;
+ ctx->list = next_list;
+
+ /* If we have no more in our list, return EOF. */
+ if (!ctx->list) {
+ if (ctx->hold_open) {
+ return ctx->hold_open(ctx->hold_open_baton, bucket);
+ }
+ else {
+ return APR_EOF;
+ }
+ }
+
+ /* At this point, it safe to read the next bucket - if we can. */
+
+ /* If the caller doesn't want ALL_AVAIL, decrement the size
+ * of the items we just read from the list.
+ */
+ if (requested != SERF_READ_ALL_AVAIL) {
+ int i;
+
+ for (i = 0; i < cur_vecs_used; i++)
+ requested -= vecs[i].iov_len;
+ }
+
+ /* Adjust our vecs to account for what we just read. */
+ vecs_size -= cur_vecs_used;
+ vecs += cur_vecs_used;
+
+ /* We reached our max. Oh well. */
+ if (!requested || !vecs_size) {
+ return APR_SUCCESS;
+ }
+ }
+ }
+
+ return status;
+}
+
+static apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ aggregate_context_t *ctx = bucket->data;
+ struct iovec vec;
+ int vecs_used;
+ apr_status_t status;
+
+ cleanup_aggregate(ctx, bucket->allocator);
+
+ status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
+
+ if (!vecs_used) {
+ *len = 0;
+ }
+ else {
+ *data = vec.iov_base;
+ *len = vec.iov_len;
+ }
+
+ return status;
+}
+
+static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
+ apr_size_t requested,
+ int vecs_size,
+ struct iovec *vecs,
+ int *vecs_used)
+{
+ aggregate_context_t *ctx = bucket->data;
+
+ cleanup_aggregate(ctx, bucket->allocator);
+
+ return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
+}
+
+static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data, apr_size_t *len)
+{
+ /* Follow pattern from serf_aggregate_read. */
+ return APR_ENOTIMPL;
+}
+
+static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ /* Follow pattern from serf_aggregate_read. */
+ return APR_ENOTIMPL;
+}
+
+static serf_bucket_t * serf_aggregate_read_bucket(
+ serf_bucket_t *bucket,
+ const serf_bucket_type_t *type)
+{
+ aggregate_context_t *ctx = bucket->data;
+ serf_bucket_t *found_bucket;
+
+ if (!ctx->list) {
+ return NULL;
+ }
+
+ if (ctx->list->bucket->type == type) {
+ /* Got the bucket. Consume it from our list. */
+ found_bucket = ctx->list->bucket;
+ ctx->list = ctx->list->next;
+ return found_bucket;
+ }
+
+ /* Call read_bucket on first one in our list. */
+ return serf_bucket_read_bucket(ctx->list->bucket, type);
+}
+
+
+const serf_bucket_type_t serf_bucket_type_aggregate = {
+ "AGGREGATE",
+ serf_aggregate_read,
+ serf_aggregate_readline,
+ serf_aggregate_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_aggregate_read_bucket,
+ serf_aggregate_peek,
+ serf_aggregate_destroy_and_data,
+};