summaryrefslogtreecommitdiff
path: root/buckets
diff options
context:
space:
mode:
authorLorry <lorry@roadtrain.codethink.co.uk>2012-08-28 15:30:14 +0100
committerLorry <lorry@roadtrain.codethink.co.uk>2012-08-28 15:30:14 +0100
commit74dd0183e4e56b07cedfa87eae7a8fb3166f01e8 (patch)
tree4e4ba1caa7f82fd5b1f38c503fac24e5e25b5fc5 /buckets
downloadlibserf-tarball-74dd0183e4e56b07cedfa87eae7a8fb3166f01e8.tar.gz
Tarball conversion
Diffstat (limited to 'buckets')
-rw-r--r--buckets/aggregate_buckets.c400
-rw-r--r--buckets/allocator.c434
-rw-r--r--buckets/barrier_buckets.c97
-rw-r--r--buckets/buckets.c538
-rw-r--r--buckets/bwtp_buckets.c596
-rw-r--r--buckets/chunk_buckets.c235
-rw-r--r--buckets/dechunk_buckets.c185
-rw-r--r--buckets/deflate_buckets.c384
-rw-r--r--buckets/file_buckets.c117
-rw-r--r--buckets/headers_buckets.c429
-rw-r--r--buckets/iovec_buckets.c169
-rw-r--r--buckets/limit_buckets.c134
-rw-r--r--buckets/mmap_buckets.c118
-rw-r--r--buckets/request_buckets.c228
-rw-r--r--buckets/response_buckets.c429
-rw-r--r--buckets/simple_buckets.c142
-rw-r--r--buckets/socket_buckets.c114
-rw-r--r--buckets/ssl_buckets.c1629
18 files changed, 6378 insertions, 0 deletions
diff --git a/buckets/aggregate_buckets.c b/buckets/aggregate_buckets.c
new file mode 100644
index 0000000..d9d15a3
--- /dev/null
+++ b/buckets/aggregate_buckets.c
@@ -0,0 +1,400 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+
+/* Should be an APR_RING? */
+typedef struct bucket_list {
+ serf_bucket_t *bucket;
+ struct bucket_list *next;
+} bucket_list_t;
+
+typedef struct {
+ bucket_list_t *list; /* active buckets */
+ bucket_list_t *last; /* last bucket of the list */
+ bucket_list_t *done; /* we finished reading this; now pending a destroy */
+
+ serf_bucket_aggregate_eof_t hold_open;
+ void *hold_open_baton;
+
+ /* Does this bucket own its children? !0 if yes, 0 if not. */
+ int bucket_owner;
+} aggregate_context_t;
+
+
+static void cleanup_aggregate(aggregate_context_t *ctx,
+ serf_bucket_alloc_t *allocator)
+{
+ bucket_list_t *next_list;
+
+ /* If we finished reading a bucket during the previous read, then
+ * we can now toss that bucket.
+ */
+ while (ctx->done != NULL) {
+ next_list = ctx->done->next;
+
+ if (ctx->bucket_owner) {
+ serf_bucket_destroy(ctx->done->bucket);
+ }
+ serf_bucket_mem_free(allocator, ctx->done);
+
+ ctx->done = next_list;
+ }
+}
+
+void serf_bucket_aggregate_cleanup(
+ serf_bucket_t *bucket, serf_bucket_alloc_t *allocator)
+{
+ aggregate_context_t *ctx = bucket->data;
+
+ cleanup_aggregate(ctx, allocator);
+}
+
+static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator)
+{
+ aggregate_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+
+ ctx->list = NULL;
+ ctx->last = NULL;
+ ctx->done = NULL;
+ ctx->hold_open = NULL;
+ ctx->hold_open_baton = NULL;
+ ctx->bucket_owner = 1;
+
+ return ctx;
+}
+
+serf_bucket_t *serf_bucket_aggregate_create(
+ serf_bucket_alloc_t *allocator)
+{
+ aggregate_context_t *ctx;
+
+ ctx = create_aggregate(allocator);
+
+ return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
+}
+
+serf_bucket_t *serf__bucket_stream_create(
+ serf_bucket_alloc_t *allocator,
+ serf_bucket_aggregate_eof_t fn,
+ void *baton)
+{
+ serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator);
+ aggregate_context_t *ctx = bucket->data;
+
+ serf_bucket_aggregate_hold_open(bucket, fn, baton);
+
+ ctx->bucket_owner = 0;
+
+ return bucket;
+}
+
+
+static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
+{
+ aggregate_context_t *ctx = bucket->data;
+ bucket_list_t *next_ctx;
+
+ while (ctx->list) {
+ if (ctx->bucket_owner) {
+ serf_bucket_destroy(ctx->list->bucket);
+ }
+ next_ctx = ctx->list->next;
+ serf_bucket_mem_free(bucket->allocator, ctx->list);
+ ctx->list = next_ctx;
+ }
+ cleanup_aggregate(ctx, bucket->allocator);
+
+ serf_default_destroy_and_data(bucket);
+}
+
+void serf_bucket_aggregate_become(serf_bucket_t *bucket)
+{
+ aggregate_context_t *ctx;
+
+ ctx = create_aggregate(bucket->allocator);
+
+ bucket->type = &serf_bucket_type_aggregate;
+ bucket->data = ctx;
+
+ /* The allocator remains the same. */
+}
+
+
+void serf_bucket_aggregate_prepend(
+ serf_bucket_t *aggregate_bucket,
+ serf_bucket_t *prepend_bucket)
+{
+ aggregate_context_t *ctx = aggregate_bucket->data;
+ bucket_list_t *new_list;
+
+ new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
+ sizeof(*new_list));
+ new_list->bucket = prepend_bucket;
+ new_list->next = ctx->list;
+
+ ctx->list = new_list;
+}
+
+void serf_bucket_aggregate_append(
+ serf_bucket_t *aggregate_bucket,
+ serf_bucket_t *append_bucket)
+{
+ aggregate_context_t *ctx = aggregate_bucket->data;
+ bucket_list_t *new_list;
+
+ new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
+ sizeof(*new_list));
+ new_list->bucket = append_bucket;
+ new_list->next = NULL;
+
+ /* If we use APR_RING, this is trivial. So, wait.
+ new_list->next = ctx->list;
+ ctx->list = new_list;
+ */
+ if (ctx->list == NULL) {
+ ctx->list = new_list;
+ ctx->last = new_list;
+ }
+ else {
+ ctx->last->next = new_list;
+ ctx->last = ctx->last->next;
+ }
+}
+
+void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket,
+ serf_bucket_aggregate_eof_t fn,
+ void *baton)
+{
+ aggregate_context_t *ctx = aggregate_bucket->data;
+ ctx->hold_open = fn;
+ ctx->hold_open_baton = baton;
+}
+
+void serf_bucket_aggregate_prepend_iovec(
+ serf_bucket_t *aggregate_bucket,
+ struct iovec *vecs,
+ int vecs_count)
+{
+ int i;
+
+ /* Add in reverse order. */
+ for (i = vecs_count - 1; i >= 0; i--) {
+ serf_bucket_t *new_bucket;
+
+ new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
+ vecs[i].iov_len,
+ NULL, NULL,
+ aggregate_bucket->allocator);
+
+ serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
+
+ }
+}
+
+void serf_bucket_aggregate_append_iovec(
+ serf_bucket_t *aggregate_bucket,
+ struct iovec *vecs,
+ int vecs_count)
+{
+ serf_bucket_t *new_bucket;
+
+ new_bucket = serf_bucket_iovec_create(vecs, vecs_count,
+ aggregate_bucket->allocator);
+
+ serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
+}
+
+static apr_status_t read_aggregate(serf_bucket_t *bucket,
+ apr_size_t requested,
+ int vecs_size, struct iovec *vecs,
+ int *vecs_used)
+{
+ aggregate_context_t *ctx = bucket->data;
+ int cur_vecs_used;
+ apr_status_t status;
+
+ *vecs_used = 0;
+
+ if (!ctx->list) {
+ if (ctx->hold_open) {
+ return ctx->hold_open(ctx->hold_open_baton, bucket);
+ }
+ else {
+ return APR_EOF;
+ }
+ }
+
+ status = APR_SUCCESS;
+ while (requested) {
+ serf_bucket_t *head = ctx->list->bucket;
+
+ status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
+ &cur_vecs_used);
+
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+
+ /* Add the number of vecs we read to our running total. */
+ *vecs_used += cur_vecs_used;
+
+ if (cur_vecs_used > 0 || status) {
+ bucket_list_t *next_list;
+
+ /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
+ * as it isn't safe to read more without returning to our caller.
+ */
+ if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) {
+ return status;
+ }
+
+ /* However, if we read EOF, we can stash this bucket in a
+ * to-be-freed list and move on to the next bucket. This ensures
+ * that the bucket stays alive (so as not to violate our read
+ * semantics). We'll destroy this list of buckets the next time
+ * we are asked to perform a read operation - thus ensuring the
+ * proper read lifetime.
+ */
+ next_list = ctx->list->next;
+ ctx->list->next = ctx->done;
+ ctx->done = ctx->list;
+ ctx->list = next_list;
+
+ /* If we have no more in our list, return EOF. */
+ if (!ctx->list) {
+ if (ctx->hold_open) {
+ return ctx->hold_open(ctx->hold_open_baton, bucket);
+ }
+ else {
+ return APR_EOF;
+ }
+ }
+
+ /* At this point, it safe to read the next bucket - if we can. */
+
+ /* If the caller doesn't want ALL_AVAIL, decrement the size
+ * of the items we just read from the list.
+ */
+ if (requested != SERF_READ_ALL_AVAIL) {
+ int i;
+
+ for (i = 0; i < cur_vecs_used; i++)
+ requested -= vecs[i].iov_len;
+ }
+
+ /* Adjust our vecs to account for what we just read. */
+ vecs_size -= cur_vecs_used;
+ vecs += cur_vecs_used;
+
+ /* We reached our max. Oh well. */
+ if (!requested || !vecs_size) {
+ return APR_SUCCESS;
+ }
+ }
+ }
+
+ return status;
+}
+
+static apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ aggregate_context_t *ctx = bucket->data;
+ struct iovec vec;
+ int vecs_used;
+ apr_status_t status;
+
+ cleanup_aggregate(ctx, bucket->allocator);
+
+ status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
+
+ if (!vecs_used) {
+ *len = 0;
+ }
+ else {
+ *data = vec.iov_base;
+ *len = vec.iov_len;
+ }
+
+ return status;
+}
+
+static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
+ apr_size_t requested,
+ int vecs_size,
+ struct iovec *vecs,
+ int *vecs_used)
+{
+ aggregate_context_t *ctx = bucket->data;
+
+ cleanup_aggregate(ctx, bucket->allocator);
+
+ return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
+}
+
+static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data, apr_size_t *len)
+{
+ /* Follow pattern from serf_aggregate_read. */
+ return APR_ENOTIMPL;
+}
+
+static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ /* Follow pattern from serf_aggregate_read. */
+ return APR_ENOTIMPL;
+}
+
+static serf_bucket_t * serf_aggregate_read_bucket(
+ serf_bucket_t *bucket,
+ const serf_bucket_type_t *type)
+{
+ aggregate_context_t *ctx = bucket->data;
+ serf_bucket_t *found_bucket;
+
+ if (!ctx->list) {
+ return NULL;
+ }
+
+ if (ctx->list->bucket->type == type) {
+ /* Got the bucket. Consume it from our list. */
+ found_bucket = ctx->list->bucket;
+ ctx->list = ctx->list->next;
+ return found_bucket;
+ }
+
+ /* Call read_bucket on first one in our list. */
+ return serf_bucket_read_bucket(ctx->list->bucket, type);
+}
+
+
+const serf_bucket_type_t serf_bucket_type_aggregate = {
+ "AGGREGATE",
+ serf_aggregate_read,
+ serf_aggregate_readline,
+ serf_aggregate_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_aggregate_read_bucket,
+ serf_aggregate_peek,
+ serf_aggregate_destroy_and_data,
+};
diff --git a/buckets/allocator.c b/buckets/allocator.c
new file mode 100644
index 0000000..857cc74
--- /dev/null
+++ b/buckets/allocator.c
@@ -0,0 +1,434 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+
+#include <apr_pools.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+
+typedef struct node_header_t {
+ apr_size_t size;
+ union {
+ struct node_header_t *next; /* if size == 0 (freed/inactive) */
+ /* no data if size == STANDARD_NODE_SIZE */
+ apr_memnode_t *memnode; /* if size > STANDARD_NODE_SIZE */
+ } u;
+} node_header_t;
+
+/* The size of a node_header_t, properly aligned. Note that (normally)
+ * this macro will round the size to a multiple of 8 bytes. Keep this in
+ * mind when altering the node_header_t structure. Also, keep in mind that
+ * node_header_t is an overhead for every allocation performed through
+ * the serf_bucket_mem_alloc() function.
+ */
+#define SIZEOF_NODE_HEADER_T APR_ALIGN_DEFAULT(sizeof(node_header_t))
+
+
+/* STANDARD_NODE_SIZE is manually set to an allocation size that will
+ * capture most allocators performed via this API. It must be "large
+ * enough" to avoid lots of spillage to allocating directly from the
+ * apr_allocator associated with the bucket allocator. The apr_allocator
+ * has a minimum size of 8k, which can be expensive if you missed the
+ * STANDARD_NODE_SIZE by just a few bytes.
+ */
+/* ### we should define some rules or ways to determine how to derive
+ * ### a "good" value for this. probably log some stats on allocs, then
+ * ### analyze them for size "misses". then find the balance point between
+ * ### wasted space due to min-size allocator, and wasted-space due to
+ * ### size-spill to the 8k minimum.
+ */
+#define STANDARD_NODE_SIZE 128
+
+/* When allocating a block of memory from the allocator, we should go for
+ * an 8k block, minus the overhead that the allocator needs.
+ */
+#define ALLOC_AMT (8192 - APR_MEMNODE_T_SIZE)
+
+/* Define DEBUG_DOUBLE_FREE if you're interested in debugging double-free
+ * calls to serf_bucket_mem_free().
+ */
+#define DEBUG_DOUBLE_FREE
+
+
+typedef struct {
+ const serf_bucket_t *bucket;
+ apr_status_t last;
+} read_status_t;
+
+#define TRACK_BUCKET_COUNT 100 /* track N buckets' status */
+
+typedef struct {
+ int next_index; /* info[] is a ring. next bucket goes at this idx. */
+ int num_used;
+
+ read_status_t info[TRACK_BUCKET_COUNT];
+} track_state_t;
+
+
+struct serf_bucket_alloc_t {
+ apr_pool_t *pool;
+ apr_allocator_t *allocator;
+ int own_allocator;
+
+ serf_unfreed_func_t unfreed;
+ void *unfreed_baton;
+
+ apr_uint32_t num_alloc;
+
+ node_header_t *freelist; /* free STANDARD_NODE_SIZE blocks */
+ apr_memnode_t *blocks; /* blocks we allocated for subdividing */
+
+ track_state_t *track;
+};
+
+/* ==================================================================== */
+
+
+static apr_status_t allocator_cleanup(void *data)
+{
+ serf_bucket_alloc_t *allocator = data;
+
+ /* If we allocated anything, give it back. */
+ if (allocator->blocks) {
+ apr_allocator_free(allocator->allocator, allocator->blocks);
+ }
+
+ /* If we allocated our own allocator (?!), destroy it here. */
+ if (allocator->own_allocator) {
+ apr_allocator_destroy(allocator->allocator);
+ }
+
+ return APR_SUCCESS;
+}
+
+serf_bucket_alloc_t *serf_bucket_allocator_create(
+ apr_pool_t *pool,
+ serf_unfreed_func_t unfreed,
+ void *unfreed_baton)
+{
+ serf_bucket_alloc_t *allocator = apr_pcalloc(pool, sizeof(*allocator));
+
+ allocator->pool = pool;
+ allocator->allocator = apr_pool_allocator_get(pool);
+ if (allocator->allocator == NULL) {
+ /* This most likely means pools are running in debug mode, create our
+ * own allocator to deal with memory ourselves */
+ apr_allocator_create(&allocator->allocator);
+ allocator->own_allocator = 1;
+ }
+ allocator->unfreed = unfreed;
+ allocator->unfreed_baton = unfreed_baton;
+
+#ifdef SERF_DEBUG_BUCKET_USE
+ {
+ track_state_t *track;
+
+ track = allocator->track = apr_palloc(pool, sizeof(*allocator->track));
+ track->next_index = 0;
+ track->num_used = 0;
+ }
+#endif
+
+ /* ### this implies buckets cannot cross a fork/exec. desirable?
+ *
+ * ### hmm. it probably also means that buckets cannot be AROUND
+ * ### during a fork/exec. the new process will try to clean them
+ * ### up and figure out there are unfreed blocks...
+ */
+ apr_pool_cleanup_register(pool, allocator,
+ allocator_cleanup, allocator_cleanup);
+
+ return allocator;
+}
+
+apr_pool_t *serf_bucket_allocator_get_pool(
+ const serf_bucket_alloc_t *allocator)
+{
+ return allocator->pool;
+}
+
+
+void *serf_bucket_mem_alloc(
+ serf_bucket_alloc_t *allocator,
+ apr_size_t size)
+{
+ node_header_t *node;
+
+ ++allocator->num_alloc;
+
+ size += SIZEOF_NODE_HEADER_T;
+ if (size <= STANDARD_NODE_SIZE) {
+ if (allocator->freelist) {
+ /* just pull a node off our freelist */
+ node = allocator->freelist;
+ allocator->freelist = node->u.next;
+#ifdef DEBUG_DOUBLE_FREE
+ /* When we free an item, we set its size to zero. Thus, when
+ * we return it to the caller, we must ensure the size is set
+ * properly.
+ */
+ node->size = STANDARD_NODE_SIZE;
+#endif
+ }
+ else {
+ apr_memnode_t *active = allocator->blocks;
+
+ if (active == NULL
+ || active->first_avail + STANDARD_NODE_SIZE >= active->endp) {
+ apr_memnode_t *head = allocator->blocks;
+
+ /* ran out of room. grab another block. */
+ active = apr_allocator_alloc(allocator->allocator, ALLOC_AMT);
+
+ /* System couldn't provide us with memory. */
+ if (active == NULL)
+ return NULL;
+
+ /* link the block into our tracking list */
+ allocator->blocks = active;
+ active->next = head;
+ }
+
+ node = (node_header_t *)active->first_avail;
+ node->size = STANDARD_NODE_SIZE;
+ active->first_avail += STANDARD_NODE_SIZE;
+ }
+ }
+ else {
+ apr_memnode_t *memnode = apr_allocator_alloc(allocator->allocator,
+ size);
+
+ if (memnode == NULL)
+ return NULL;
+
+ node = (node_header_t *)memnode->first_avail;
+ node->u.memnode = memnode;
+ node->size = size;
+ }
+
+ return ((char *)node) + SIZEOF_NODE_HEADER_T;
+}
+
+
+void *serf_bucket_mem_calloc(
+ serf_bucket_alloc_t *allocator,
+ apr_size_t size)
+{
+ void *mem;
+ mem = serf_bucket_mem_alloc(allocator, size);
+ if (mem == NULL)
+ return NULL;
+ memset(mem, 0, size);
+ return mem;
+}
+
+
+void serf_bucket_mem_free(
+ serf_bucket_alloc_t *allocator,
+ void *block)
+{
+ node_header_t *node;
+
+ --allocator->num_alloc;
+
+ node = (node_header_t *)((char *)block - SIZEOF_NODE_HEADER_T);
+
+ if (node->size == STANDARD_NODE_SIZE) {
+ /* put the node onto our free list */
+ node->u.next = allocator->freelist;
+ allocator->freelist = node;
+
+#ifdef DEBUG_DOUBLE_FREE
+ /* note that this thing was freed. */
+ node->size = 0;
+ }
+ else if (node->size == 0) {
+ /* damn thing was freed already. */
+ abort();
+#endif
+ }
+ else {
+#ifdef DEBUG_DOUBLE_FREE
+ /* note that this thing was freed. */
+ node->size = 0;
+#endif
+
+ /* now free it */
+ apr_allocator_free(allocator->allocator, node->u.memnode);
+ }
+}
+
+
+/* ==================================================================== */
+
+
+#ifdef SERF_DEBUG_BUCKET_USE
+
+static read_status_t *find_read_status(
+ track_state_t *track,
+ const serf_bucket_t *bucket,
+ int create_rs)
+{
+ read_status_t *rs;
+
+ if (track->num_used) {
+ int count = track->num_used;
+ int idx = track->next_index;
+
+ /* Search backwards. In all likelihood, the bucket which just got
+ * read was read very recently.
+ */
+ while (count-- > 0) {
+ if (!idx--) {
+ /* assert: track->num_used == TRACK_BUCKET_COUNT */
+ idx = track->num_used - 1;
+ }
+ if ((rs = &track->info[idx])->bucket == bucket) {
+ return rs;
+ }
+ }
+ }
+
+ /* Only create a new read_status_t when asked. */
+ if (!create_rs)
+ return NULL;
+
+ if (track->num_used < TRACK_BUCKET_COUNT) {
+ /* We're still filling up the ring. */
+ ++track->num_used;
+ }
+
+ rs = &track->info[track->next_index];
+ rs->bucket = bucket;
+ rs->last = APR_SUCCESS; /* ### the right initial value? */
+
+ if (++track->next_index == TRACK_BUCKET_COUNT)
+ track->next_index = 0;
+
+ return rs;
+}
+
+#endif /* SERF_DEBUG_BUCKET_USE */
+
+
+apr_status_t serf_debug__record_read(
+ const serf_bucket_t *bucket,
+ apr_status_t status)
+{
+#ifndef SERF_DEBUG_BUCKET_USE
+ return status;
+#else
+
+ track_state_t *track = bucket->allocator->track;
+ read_status_t *rs = find_read_status(track, bucket, 1);
+
+ /* Validate that the previous status value allowed for another read. */
+ if (APR_STATUS_IS_EAGAIN(rs->last) /* ### or APR_EOF? */) {
+ /* Somebody read when they weren't supposed to. Bail. */
+ abort();
+ }
+
+ /* Save the current status for later. */
+ rs->last = status;
+
+ return status;
+#endif
+}
+
+
+void serf_debug__entered_loop(serf_bucket_alloc_t *allocator)
+{
+#ifdef SERF_DEBUG_BUCKET_USE
+
+ track_state_t *track = allocator->track;
+ read_status_t *rs = &track->info[0];
+
+ for ( ; track->num_used; --track->num_used, ++rs ) {
+ if (rs->last == APR_SUCCESS) {
+ /* Somebody should have read this bucket again. */
+ abort();
+ }
+
+ /* ### other status values? */
+ }
+
+ /* num_used was reset. also need to reset the next index. */
+ track->next_index = 0;
+
+#endif
+}
+
+
+void serf_debug__closed_conn(serf_bucket_alloc_t *allocator)
+{
+#ifdef SERF_DEBUG_BUCKET_USE
+
+ /* Just reset the number used so that we don't examine the info[] */
+ allocator->track->num_used = 0;
+ allocator->track->next_index = 0;
+
+#endif
+}
+
+
+void serf_debug__bucket_destroy(const serf_bucket_t *bucket)
+{
+#ifdef SERF_DEBUG_BUCKET_USE
+
+ track_state_t *track = bucket->allocator->track;
+ read_status_t *rs = find_read_status(track, bucket, 0);
+
+ if (rs != NULL && rs->last != APR_EOF) {
+ /* The bucket was destroyed before it was read to completion. */
+
+ /* Special exception for socket buckets. If a connection remains
+ * open, they are not read to completion.
+ */
+ if (SERF_BUCKET_IS_SOCKET(bucket))
+ return;
+
+ /* Ditto for SSL Decrypt buckets. */
+ if (SERF_BUCKET_IS_SSL_DECRYPT(bucket))
+ return;
+
+ /* Ditto for SSL Encrypt buckets. */
+ if (SERF_BUCKET_IS_SSL_ENCRYPT(bucket))
+ return;
+
+ /* Ditto for barrier buckets. */
+ if (SERF_BUCKET_IS_BARRIER(bucket))
+ return;
+
+
+ abort();
+ }
+
+#endif
+}
+
+
+void serf_debug__bucket_alloc_check(
+ serf_bucket_alloc_t *allocator)
+{
+#ifdef SERF_DEBUG_BUCKET_USE
+ if (allocator->num_alloc != 0) {
+ abort();
+ }
+#endif
+}
+
diff --git a/buckets/barrier_buckets.c b/buckets/barrier_buckets.c
new file mode 100644
index 0000000..eb410ee
--- /dev/null
+++ b/buckets/barrier_buckets.c
@@ -0,0 +1,97 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_pools.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+
+typedef struct {
+ serf_bucket_t *stream;
+} barrier_context_t;
+
+
+serf_bucket_t *serf_bucket_barrier_create(
+ serf_bucket_t *stream,
+ serf_bucket_alloc_t *allocator)
+{
+ barrier_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->stream = stream;
+
+ return serf_bucket_create(&serf_bucket_type_barrier, allocator, ctx);
+}
+
+static apr_status_t serf_barrier_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ barrier_context_t *ctx = bucket->data;
+
+ return serf_bucket_read(ctx->stream, requested, data, len);
+}
+
+static apr_status_t serf_barrier_read_iovec(serf_bucket_t *bucket,
+ apr_size_t requested,
+ int vecs_size, struct iovec *vecs,
+ int *vecs_used)
+{
+ barrier_context_t *ctx = bucket->data;
+
+ return serf_bucket_read_iovec(ctx->stream, requested, vecs_size, vecs,
+ vecs_used);
+}
+
+static apr_status_t serf_barrier_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data, apr_size_t *len)
+{
+ barrier_context_t *ctx = bucket->data;
+
+ return serf_bucket_readline(ctx->stream, acceptable, found, data, len);
+}
+
+static apr_status_t serf_barrier_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ barrier_context_t *ctx = bucket->data;
+
+ return serf_bucket_peek(ctx->stream, data, len);
+}
+
+static void serf_barrier_destroy(serf_bucket_t *bucket)
+{
+ /* The intent of this bucket is not to let our wrapped buckets be
+ * destroyed. */
+
+ /* The option is for us to go ahead and 'eat' this bucket now,
+ * or just ignore the deletion entirely.
+ */
+ serf_default_destroy_and_data(bucket);
+}
+
+const serf_bucket_type_t serf_bucket_type_barrier = {
+ "BARRIER",
+ serf_barrier_read,
+ serf_barrier_readline,
+ serf_barrier_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_barrier_peek,
+ serf_barrier_destroy,
+};
diff --git a/buckets/buckets.c b/buckets/buckets.c
new file mode 100644
index 0000000..29175ff
--- /dev/null
+++ b/buckets/buckets.c
@@ -0,0 +1,538 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_pools.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+
+serf_bucket_t *serf_bucket_create(
+ const serf_bucket_type_t *type,
+ serf_bucket_alloc_t *allocator,
+ void *data)
+{
+ serf_bucket_t *bkt = serf_bucket_mem_alloc(allocator, sizeof(*bkt));
+
+ bkt->type = type;
+ bkt->data = data;
+ bkt->allocator = allocator;
+
+ return bkt;
+}
+
+
+apr_status_t serf_default_read_iovec(
+ serf_bucket_t *bucket,
+ apr_size_t requested,
+ int vecs_size,
+ struct iovec *vecs,
+ int *vecs_used)
+{
+ const char *data;
+ apr_size_t len;
+
+ /* Read some data from the bucket.
+ *
+ * Because we're an internal 'helper' to the bucket, we can't call the
+ * normal serf_bucket_read() call because the debug allocator tracker will
+ * end up marking the bucket as read *twice* - once for us and once for
+ * our caller - which is reading the same bucket. This leads to premature
+ * abort()s if we ever see EAGAIN. Instead, we'll go directly to the
+ * vtable and bypass the debug tracker.
+ */
+ apr_status_t status = bucket->type->read(bucket, requested, &data, &len);
+
+ /* assert that vecs_size >= 1 ? */
+
+ /* Return that data as a single iovec. */
+ if (len) {
+ vecs[0].iov_base = (void *)data; /* loses the 'const' */
+ vecs[0].iov_len = len;
+ *vecs_used = 1;
+ }
+ else {
+ *vecs_used = 0;
+ }
+
+ return status;
+}
+
+
+apr_status_t serf_default_read_for_sendfile(
+ serf_bucket_t *bucket,
+ apr_size_t requested,
+ apr_hdtr_t *hdtr,
+ apr_file_t **file,
+ apr_off_t *offset,
+ apr_size_t *len)
+{
+ /* Read a bunch of stuff into the headers.
+ *
+ * See serf_default_read_iovec as to why we call into the vtable
+ * directly.
+ */
+ apr_status_t status = bucket->type->read_iovec(bucket, requested,
+ hdtr->numheaders,
+ hdtr->headers,
+ &hdtr->numheaders);
+
+ /* There isn't a file, and there are no trailers. */
+ *file = NULL;
+ hdtr->numtrailers = 0;
+
+ return status;
+}
+
+
+serf_bucket_t *serf_default_read_bucket(
+ serf_bucket_t *bucket,
+ const serf_bucket_type_t *type)
+{
+ return NULL;
+}
+
+
+void serf_default_destroy(serf_bucket_t *bucket)
+{
+#ifdef SERF_DEBUG_BUCKET_USE
+ serf_debug__bucket_destroy(bucket);
+#endif
+
+ serf_bucket_mem_free(bucket->allocator, bucket);
+}
+
+
+void serf_default_destroy_and_data(serf_bucket_t *bucket)
+{
+ serf_bucket_mem_free(bucket->allocator, bucket->data);
+ serf_default_destroy(bucket);
+}
+
+
+/* ==================================================================== */
+
+
+char *serf_bstrmemdup(serf_bucket_alloc_t *allocator,
+ const char *str,
+ apr_size_t size)
+{
+ char *newstr = serf_bucket_mem_alloc(allocator, size + 1);
+ memcpy(newstr, str, size);
+ newstr[size] = '\0';
+ return newstr;
+}
+
+
+void *serf_bmemdup(serf_bucket_alloc_t *allocator,
+ const void *mem,
+ apr_size_t size)
+{
+ void *newmem = serf_bucket_mem_alloc(allocator, size);
+ memcpy(newmem, mem, size);
+ return newmem;
+}
+
+
+char *serf_bstrdup(serf_bucket_alloc_t *allocator,
+ const char *str)
+{
+ apr_size_t size = strlen(str) + 1;
+ char *newstr = serf_bucket_mem_alloc(allocator, size);
+ memcpy(newstr, str, size);
+ return newstr;
+}
+
+
+/* ==================================================================== */
+
+
+static void find_crlf(const char **data, apr_size_t *len, int *found)
+{
+ const char *start = *data;
+ const char *end = start + *len;
+
+ while (start < end) {
+ const char *cr = memchr(start, '\r', *len);
+
+ if (cr == NULL) {
+ break;
+ }
+ ++cr;
+
+ if (cr < end && cr[0] == '\n') {
+ *len -= cr + 1 - start;
+ *data = cr + 1;
+ *found = SERF_NEWLINE_CRLF;
+ return;
+ }
+ if (cr == end) {
+ *len = 0;
+ *data = end;
+ *found = SERF_NEWLINE_CRLF_SPLIT;
+ return;
+ }
+
+ /* It was a bare CR without an LF. Just move past it. */
+ *len -= cr - start;
+ start = cr;
+ }
+
+ *data = start + *len;
+ *len -= *data - start;
+ *found = SERF_NEWLINE_NONE;
+}
+
+
+void serf_util_readline(
+ const char **data,
+ apr_size_t *len,
+ int acceptable,
+ int *found)
+{
+ const char *start;
+ const char *cr;
+ const char *lf;
+ int want_cr;
+ int want_crlf;
+ int want_lf;
+
+ /* If _only_ CRLF is acceptable, then the scanning needs a loop to
+ * skip false hits on CR characters. Use a separate function.
+ */
+ if (acceptable == SERF_NEWLINE_CRLF) {
+ find_crlf(data, len, found);
+ return;
+ }
+
+ start = *data;
+ cr = lf = NULL;
+ want_cr = acceptable & SERF_NEWLINE_CR;
+ want_crlf = acceptable & SERF_NEWLINE_CRLF;
+ want_lf = acceptable & SERF_NEWLINE_LF;
+
+ if (want_cr || want_crlf) {
+ cr = memchr(start, '\r', *len);
+ }
+ if (want_lf) {
+ lf = memchr(start, '\n', *len);
+ }
+
+ if (cr != NULL) {
+ if (lf != NULL) {
+ if (cr + 1 == lf)
+ *found = want_crlf ? SERF_NEWLINE_CRLF : SERF_NEWLINE_CR;
+ else if (want_cr && cr < lf)
+ *found = SERF_NEWLINE_CR;
+ else
+ *found = SERF_NEWLINE_LF;
+ }
+ else if (cr == start + *len - 1) {
+ /* the CR occurred in the last byte of the buffer. this could be
+ * a CRLF split across the data boundary.
+ * ### FIX THIS LOGIC? does caller need to detect?
+ */
+ *found = want_crlf ? SERF_NEWLINE_CRLF_SPLIT : SERF_NEWLINE_CR;
+ }
+ else if (want_cr)
+ *found = SERF_NEWLINE_CR;
+ else /* want_crlf */
+ *found = SERF_NEWLINE_NONE;
+ }
+ else if (lf != NULL)
+ *found = SERF_NEWLINE_LF;
+ else
+ *found = SERF_NEWLINE_NONE;
+
+ switch (*found) {
+ case SERF_NEWLINE_LF:
+ *data = lf + 1;
+ break;
+ case SERF_NEWLINE_CR:
+ case SERF_NEWLINE_CRLF:
+ case SERF_NEWLINE_CRLF_SPLIT:
+ *data = cr + 1 + (*found == SERF_NEWLINE_CRLF);
+ break;
+ case SERF_NEWLINE_NONE:
+ *data += *len;
+ break;
+ default:
+ /* Not reachable */
+ return;
+ }
+
+ *len -= *data - start;
+}
+
+
+/* ==================================================================== */
+
+
+void serf_databuf_init(serf_databuf_t *databuf)
+{
+ /* nothing is sitting in the buffer */
+ databuf->remaining = 0;
+
+ /* avoid thinking we have hit EOF */
+ databuf->status = APR_SUCCESS;
+}
+
+/* Ensure the buffer is prepared for reading. Will return APR_SUCCESS,
+ * APR_EOF, or some failure code. *len is only set for EOF. */
+static apr_status_t common_databuf_prep(serf_databuf_t *databuf,
+ apr_size_t *len)
+{
+ apr_size_t readlen;
+ apr_status_t status;
+
+ /* if there is data in the buffer, then we're happy. */
+ if (databuf->remaining > 0)
+ return APR_SUCCESS;
+
+ /* if we already hit EOF, then keep returning that. */
+ if (APR_STATUS_IS_EOF(databuf->status)) {
+ /* *data = NULL; ?? */
+ *len = 0;
+ return APR_EOF;
+ }
+
+ /* refill the buffer */
+ status = (*databuf->read)(databuf->read_baton, sizeof(databuf->buf),
+ databuf->buf, &readlen);
+ if (SERF_BUCKET_READ_ERROR(status)) {
+ return status;
+ }
+
+ databuf->current = databuf->buf;
+ databuf->remaining = readlen;
+ databuf->status = status;
+
+ return APR_SUCCESS;
+}
+
+
+apr_status_t serf_databuf_read(
+ serf_databuf_t *databuf,
+ apr_size_t requested,
+ const char **data,
+ apr_size_t *len)
+{
+ apr_status_t status = common_databuf_prep(databuf, len);
+ if (status)
+ return status;
+
+ /* peg the requested amount to what we have remaining */
+ if (requested == SERF_READ_ALL_AVAIL || requested > databuf->remaining)
+ requested = databuf->remaining;
+
+ /* return the values */
+ *data = databuf->current;
+ *len = requested;
+
+ /* adjust our internal state to note we've consumed some data */
+ databuf->current += requested;
+ databuf->remaining -= requested;
+
+ /* If we read everything, then we need to return whatever the data
+ * read returned to us. This is going to be APR_EOF or APR_EGAIN.
+ * If we have NOT read everything, then return APR_SUCCESS to indicate
+ * that we're ready to return some more if asked.
+ */
+ return databuf->remaining ? APR_SUCCESS : databuf->status;
+}
+
+
+apr_status_t serf_databuf_readline(
+ serf_databuf_t *databuf,
+ int acceptable,
+ int *found,
+ const char **data,
+ apr_size_t *len)
+{
+ apr_status_t status = common_databuf_prep(databuf, len);
+ if (status)
+ return status;
+
+ /* the returned line will start at the current position. */
+ *data = databuf->current;
+
+ /* read a line from the buffer, and adjust the various pointers. */
+ serf_util_readline(&databuf->current, &databuf->remaining, acceptable,
+ found);
+
+ /* the length matches the amount consumed by the readline */
+ *len = databuf->current - *data;
+
+ /* see serf_databuf_read's return condition */
+ return databuf->remaining ? APR_SUCCESS : databuf->status;
+}
+
+
+apr_status_t serf_databuf_peek(
+ serf_databuf_t *databuf,
+ const char **data,
+ apr_size_t *len)
+{
+ apr_status_t status = common_databuf_prep(databuf, len);
+ if (status)
+ return status;
+
+ /* return everything we have */
+ *data = databuf->current;
+ *len = databuf->remaining;
+
+ /* If the last read returned EOF, then the peek should return the same.
+ * The other possibility in databuf->status is APR_EAGAIN, which we
+ * should never return. Thus, just return APR_SUCCESS for non-EOF cases.
+ */
+ if (APR_STATUS_IS_EOF(databuf->status))
+ return APR_EOF;
+ return APR_SUCCESS;
+}
+
+
+/* ==================================================================== */
+
+
+void serf_linebuf_init(serf_linebuf_t *linebuf)
+{
+ linebuf->state = SERF_LINEBUF_EMPTY;
+ linebuf->used = 0;
+}
+
+
+apr_status_t serf_linebuf_fetch(
+ serf_linebuf_t *linebuf,
+ serf_bucket_t *bucket,
+ int acceptable)
+{
+ /* If we had a complete line, then assume the caller has used it, so
+ * we can now reset the state.
+ */
+ if (linebuf->state == SERF_LINEBUF_READY) {
+ linebuf->state = SERF_LINEBUF_EMPTY;
+
+ /* Reset the line_used, too, so we don't have to test the state
+ * before using this value.
+ */
+ linebuf->used = 0;
+ }
+
+ while (1) {
+ apr_status_t status;
+ const char *data;
+ apr_size_t len;
+
+ if (linebuf->state == SERF_LINEBUF_CRLF_SPLIT) {
+ /* On the previous read, we received just a CR. The LF might
+ * be present, but the bucket couldn't see it. We need to
+ * examine a single character to determine how to handle the
+ * split CRLF.
+ */
+
+ status = serf_bucket_peek(bucket, &data, &len);
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+
+ if (len > 0) {
+ if (*data == '\n') {
+ /* We saw the second part of CRLF. We don't need to
+ * save that character, so do an actual read to suck
+ * up that character.
+ */
+ /* ### check status */
+ (void) serf_bucket_read(bucket, 1, &data, &len);
+ }
+ /* else:
+ * We saw the first character of the next line. Thus,
+ * the current line is terminated by the CR. Just
+ * ignore whatever we peeked at. The next reader will
+ * see it and handle it as appropriate.
+ */
+
+ /* Whatever was read, the line is now ready for use. */
+ linebuf->state = SERF_LINEBUF_READY;
+ }
+ /* ### we need data. gotta check this char. bail if zero?! */
+ /* else len == 0 */
+
+ /* ### status */
+ }
+ else {
+ int found;
+
+ status = serf_bucket_readline(bucket, acceptable, &found,
+ &data, &len);
+ if (SERF_BUCKET_READ_ERROR(status)) {
+ return status;
+ }
+ /* Some bucket types (socket) might need an extra read to find
+ out EOF state, so they'll return no data in that read. This
+ means we're done reading, return what we got. */
+ if (APR_STATUS_IS_EOF(status) && len == 0) {
+ return status;
+ }
+ if (linebuf->used + len > sizeof(linebuf->line)) {
+ /* ### need a "line too long" error */
+ return APR_EGENERAL;
+ }
+
+ /* Note: our logic doesn't change for SERF_LINEBUF_PARTIAL. That
+ * only affects how we fill the buffer. It is a communication to
+ * our caller on whether the line is ready or not.
+ */
+
+ /* If we didn't see a newline, then we should mark the line
+ * buffer as partially complete.
+ */
+ if (found == SERF_NEWLINE_NONE) {
+ linebuf->state = SERF_LINEBUF_PARTIAL;
+ }
+ else if (found == SERF_NEWLINE_CRLF_SPLIT) {
+ linebuf->state = SERF_LINEBUF_CRLF_SPLIT;
+
+ /* Toss the partial CR. We won't ever need it. */
+ --len;
+ }
+ else {
+ /* We got a newline (of some form). We don't need it
+ * in the line buffer, so back up the length. Then
+ * mark the line as ready.
+ */
+ len -= 1 + (found == SERF_NEWLINE_CRLF);
+
+ linebuf->state = SERF_LINEBUF_READY;
+ }
+
+ /* ### it would be nice to avoid this copy if at all possible,
+ ### and just return the a data/len pair to the caller. we're
+ ### keeping it simple for now. */
+ memcpy(&linebuf->line[linebuf->used], data, len);
+ linebuf->used += len;
+ }
+
+ /* If we saw anything besides "success. please read again", then
+ * we should return that status. If the line was completed, then
+ * we should also return.
+ */
+ if (status || linebuf->state == SERF_LINEBUF_READY)
+ return status;
+
+ /* We got APR_SUCCESS and the line buffer is not complete. Let's
+ * loop to read some more data.
+ */
+ }
+ /* NOTREACHED */
+}
diff --git a/buckets/bwtp_buckets.c b/buckets/bwtp_buckets.c
new file mode 100644
index 0000000..7ef3047
--- /dev/null
+++ b/buckets/bwtp_buckets.c
@@ -0,0 +1,596 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_pools.h>
+#include <apr_strings.h>
+#include <apr_lib.h>
+#include <apr_date.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+#include "serf_bucket_types.h"
+
+#include <stdlib.h>
+
+/* This is an implementation of Bidirectional Web Transfer Protocol (BWTP)
+ * See:
+ * http://bwtp.wikidot.com/
+ */
+
+typedef struct {
+ int channel;
+ int open;
+ int type; /* 0 = header, 1 = message */ /* TODO enum? */
+ const char *phrase;
+ serf_bucket_t *headers;
+
+ char req_line[1000];
+} frame_context_t;
+
+typedef struct {
+ serf_bucket_t *stream;
+ serf_bucket_t *body; /* Pointer to the stream wrapping the body. */
+ serf_bucket_t *headers; /* holds parsed headers */
+
+ enum {
+ STATE_STATUS_LINE, /* reading status line */
+ STATE_HEADERS, /* reading headers */
+ STATE_BODY, /* reading body */
+ STATE_DONE /* we've sent EOF */
+ } state;
+
+ /* Buffer for accumulating a line from the response. */
+ serf_linebuf_t linebuf;
+
+ int type; /* 0 = header, 1 = message */ /* TODO enum? */
+ int channel;
+ char *phrase;
+ apr_size_t length;
+} incoming_context_t;
+
+
+serf_bucket_t *serf_bucket_bwtp_channel_close(
+ int channel,
+ serf_bucket_alloc_t *allocator)
+{
+ frame_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->type = 0;
+ ctx->open = 0;
+ ctx->channel = channel;
+ ctx->phrase = "CLOSED";
+ ctx->headers = serf_bucket_headers_create(allocator);
+
+ return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
+}
+
+serf_bucket_t *serf_bucket_bwtp_channel_open(
+ int channel,
+ const char *uri,
+ serf_bucket_alloc_t *allocator)
+{
+ frame_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->type = 0;
+ ctx->open = 1;
+ ctx->channel = channel;
+ ctx->phrase = uri;
+ ctx->headers = serf_bucket_headers_create(allocator);
+
+ return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
+}
+
+serf_bucket_t *serf_bucket_bwtp_header_create(
+ int channel,
+ const char *phrase,
+ serf_bucket_alloc_t *allocator)
+{
+ frame_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->type = 0;
+ ctx->open = 0;
+ ctx->channel = channel;
+ ctx->phrase = phrase;
+ ctx->headers = serf_bucket_headers_create(allocator);
+
+ return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
+}
+
+serf_bucket_t *serf_bucket_bwtp_message_create(
+ int channel,
+ serf_bucket_t *body,
+ serf_bucket_alloc_t *allocator)
+{
+ frame_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->type = 1;
+ ctx->open = 0;
+ ctx->channel = channel;
+ ctx->phrase = "MESSAGE";
+ ctx->headers = serf_bucket_headers_create(allocator);
+
+ return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
+}
+
+int serf_bucket_bwtp_frame_get_channel(
+ serf_bucket_t *bucket)
+{
+ if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
+ frame_context_t *ctx = bucket->data;
+
+ return ctx->channel;
+ }
+ else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
+ incoming_context_t *ctx = bucket->data;
+
+ return ctx->channel;
+ }
+
+ return -1;
+}
+
+int serf_bucket_bwtp_frame_get_type(
+ serf_bucket_t *bucket)
+{
+ if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
+ frame_context_t *ctx = bucket->data;
+
+ return ctx->type;
+ }
+ else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
+ incoming_context_t *ctx = bucket->data;
+
+ return ctx->type;
+ }
+
+ return -1;
+}
+
+const char *serf_bucket_bwtp_frame_get_phrase(
+ serf_bucket_t *bucket)
+{
+ if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
+ frame_context_t *ctx = bucket->data;
+
+ return ctx->phrase;
+ }
+ else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
+ incoming_context_t *ctx = bucket->data;
+
+ return ctx->phrase;
+ }
+
+ return NULL;
+}
+
+serf_bucket_t *serf_bucket_bwtp_frame_get_headers(
+ serf_bucket_t *bucket)
+{
+ if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
+ frame_context_t *ctx = bucket->data;
+
+ return ctx->headers;
+ }
+ else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
+ incoming_context_t *ctx = bucket->data;
+
+ return ctx->headers;
+ }
+
+ return NULL;
+}
+
+static int count_size(void *baton, const char *key, const char *value)
+{
+ apr_size_t *c = baton;
+ /* TODO Deal with folding. Yikes. */
+
+ /* Add in ": " and CRLF - so an extra four bytes. */
+ *c += strlen(key) + strlen(value) + 4;
+
+ return 0;
+}
+
+static apr_size_t calc_header_size(serf_bucket_t *hdrs)
+{
+ apr_size_t size = 0;
+
+ serf_bucket_headers_do(hdrs, count_size, &size);
+
+ return size;
+}
+
+static void serialize_data(serf_bucket_t *bucket)
+{
+ frame_context_t *ctx = bucket->data;
+ serf_bucket_t *new_bucket;
+ apr_size_t req_len;
+
+ /* Serialize the request-line and headers into one mother string,
+ * and wrap a bucket around it.
+ */
+ req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line),
+ "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n",
+ (ctx->type ? "BWM" : "BWH"),
+ ctx->channel, calc_header_size(ctx->headers),
+ (ctx->open ? "OPEN " : ""),
+ ctx->phrase);
+ new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len,
+ bucket->allocator);
+
+ /* Build up the new bucket structure.
+ *
+ * Note that self needs to become an aggregate bucket so that a
+ * pointer to self still represents the "right" data.
+ */
+ serf_bucket_aggregate_become(bucket);
+
+ /* Insert the two buckets. */
+ serf_bucket_aggregate_append(bucket, new_bucket);
+ serf_bucket_aggregate_append(bucket, ctx->headers);
+
+ /* Our private context is no longer needed, and is not referred to by
+ * any existing bucket. Toss it.
+ */
+ serf_bucket_mem_free(bucket->allocator, ctx);
+}
+
+static apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ /* Seralize our private data into a new aggregate bucket. */
+ serialize_data(bucket);
+
+ /* Delegate to the "new" aggregate bucket to do the read. */
+ return serf_bucket_read(bucket, requested, data, len);
+}
+
+static apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data, apr_size_t *len)
+{
+ /* Seralize our private data into a new aggregate bucket. */
+ serialize_data(bucket);
+
+ /* Delegate to the "new" aggregate bucket to do the readline. */
+ return serf_bucket_readline(bucket, acceptable, found, data, len);
+}
+
+static apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket,
+ apr_size_t requested,
+ int vecs_size,
+ struct iovec *vecs,
+ int *vecs_used)
+{
+ /* Seralize our private data into a new aggregate bucket. */
+ serialize_data(bucket);
+
+ /* Delegate to the "new" aggregate bucket to do the read. */
+ return serf_bucket_read_iovec(bucket, requested,
+ vecs_size, vecs, vecs_used);
+}
+
+static apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ /* Seralize our private data into a new aggregate bucket. */
+ serialize_data(bucket);
+
+ /* Delegate to the "new" aggregate bucket to do the peek. */
+ return serf_bucket_peek(bucket, data, len);
+}
+
+const serf_bucket_type_t serf_bucket_type_bwtp_frame = {
+ "BWTP-FRAME",
+ serf_bwtp_frame_read,
+ serf_bwtp_frame_readline,
+ serf_bwtp_frame_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_bwtp_frame_peek,
+ serf_default_destroy_and_data,
+};
+
+
+serf_bucket_t *serf_bucket_bwtp_incoming_frame_create(
+ serf_bucket_t *stream,
+ serf_bucket_alloc_t *allocator)
+{
+ incoming_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->stream = stream;
+ ctx->body = NULL;
+ ctx->headers = serf_bucket_headers_create(allocator);
+ ctx->state = STATE_STATUS_LINE;
+ ctx->length = 0;
+ ctx->channel = -1;
+ ctx->phrase = NULL;
+
+ serf_linebuf_init(&ctx->linebuf);
+
+ return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx);
+}
+
+static void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket)
+{
+ incoming_context_t *ctx = bucket->data;
+
+ if (ctx->state != STATE_STATUS_LINE && ctx->phrase) {
+ serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase);
+ }
+
+ serf_bucket_destroy(ctx->stream);
+ if (ctx->body != NULL)
+ serf_bucket_destroy(ctx->body);
+ serf_bucket_destroy(ctx->headers);
+
+ serf_default_destroy_and_data(bucket);
+}
+
+static apr_status_t fetch_line(incoming_context_t *ctx, int acceptable)
+{
+ return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable);
+}
+
+static apr_status_t parse_status_line(incoming_context_t *ctx,
+ serf_bucket_alloc_t *allocator)
+{
+ int res;
+ char *reason; /* ### stupid APR interface makes this non-const */
+
+ /* ctx->linebuf.line should be of form: BW* */
+ res = apr_date_checkmask(ctx->linebuf.line, "BW*");
+ if (!res) {
+ /* Not an BWTP response? Well, at least we won't understand it. */
+ return APR_EGENERAL;
+ }
+
+ if (ctx->linebuf.line[2] == 'H') {
+ ctx->type = 0;
+ }
+ else if (ctx->linebuf.line[2] == 'M') {
+ ctx->type = 1;
+ }
+ else {
+ ctx->type = -1;
+ }
+
+ ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16);
+
+ /* Skip leading spaces for the reason string. */
+ if (apr_isspace(*reason)) {
+ reason++;
+ }
+
+ ctx->length = apr_strtoi64(reason, &reason, 16);
+
+ /* Skip leading spaces for the reason string. */
+ if (reason - ctx->linebuf.line < ctx->linebuf.used) {
+ if (apr_isspace(*reason)) {
+ reason++;
+ }
+
+ ctx->phrase = serf_bstrmemdup(allocator, reason,
+ ctx->linebuf.used
+ - (reason - ctx->linebuf.line));
+ } else {
+ ctx->phrase = NULL;
+ }
+
+ return APR_SUCCESS;
+}
+
+/* This code should be replaced with header buckets. */
+static apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx)
+{
+ apr_status_t status;
+
+ /* RFC 2616 says that CRLF is the only line ending, but we can easily
+ * accept any kind of line ending.
+ */
+ status = fetch_line(ctx, SERF_NEWLINE_ANY);
+ if (SERF_BUCKET_READ_ERROR(status)) {
+ return status;
+ }
+ /* Something was read. Process it. */
+
+ if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
+ const char *end_key;
+ const char *c;
+
+ end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used);
+ if (!c) {
+ /* Bad headers? */
+ return APR_EGENERAL;
+ }
+
+ /* Skip over initial : and spaces. */
+ while (apr_isspace(*++c))
+ continue;
+
+ /* Always copy the headers (from the linebuf into new mem). */
+ /* ### we should be able to optimize some mem copies */
+ serf_bucket_headers_setx(
+ ctx->headers,
+ ctx->linebuf.line, end_key - ctx->linebuf.line, 1,
+ c, ctx->linebuf.line + ctx->linebuf.used - c, 1);
+ }
+
+ return status;
+}
+
+/* Perform one iteration of the state machine.
+ *
+ * Will return when one the following conditions occurred:
+ * 1) a state change
+ * 2) an error
+ * 3) the stream is not ready or at EOF
+ * 4) APR_SUCCESS, meaning the machine can be run again immediately
+ */
+static apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx)
+{
+ apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */
+
+ switch (ctx->state) {
+ case STATE_STATUS_LINE:
+ /* RFC 2616 says that CRLF is the only line ending, but we can easily
+ * accept any kind of line ending.
+ */
+ status = fetch_line(ctx, SERF_NEWLINE_ANY);
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+
+ if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
+ /* The Status-Line is in the line buffer. Process it. */
+ status = parse_status_line(ctx, bkt->allocator);
+ if (status)
+ return status;
+
+ if (ctx->length) {
+ ctx->body =
+ serf_bucket_barrier_create(ctx->stream, bkt->allocator);
+ ctx->body = serf_bucket_limit_create(ctx->body, ctx->length,
+ bkt->allocator);
+ if (!ctx->type) {
+ ctx->state = STATE_HEADERS;
+ } else {
+ ctx->state = STATE_BODY;
+ }
+ } else {
+ ctx->state = STATE_DONE;
+ }
+ }
+ else {
+ /* The connection closed before we could get the next
+ * response. Treat the request as lost so that our upper
+ * end knows the server never tried to give us a response.
+ */
+ if (APR_STATUS_IS_EOF(status)) {
+ return SERF_ERROR_REQUEST_LOST;
+ }
+ }
+ break;
+ case STATE_HEADERS:
+ status = fetch_headers(ctx->body, ctx);
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+
+ /* If an empty line was read, then we hit the end of the headers.
+ * Move on to the body.
+ */
+ if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
+ /* Advance the state. */
+ ctx->state = STATE_DONE;
+ }
+ break;
+ case STATE_BODY:
+ /* Don't do anything. */
+ break;
+ case STATE_DONE:
+ return APR_EOF;
+ default:
+ /* Not reachable */
+ return APR_EGENERAL;
+ }
+
+ return status;
+}
+
+static apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx)
+{
+ apr_status_t status;
+
+ /* Keep reading and moving through states if we aren't at the BODY */
+ while (ctx->state != STATE_BODY) {
+ status = run_machine(bkt, ctx);
+
+ /* Anything other than APR_SUCCESS means that we cannot immediately
+ * read again (for now).
+ */
+ if (status)
+ return status;
+ }
+ /* in STATE_BODY */
+
+ return APR_SUCCESS;
+}
+
+apr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers(
+ serf_bucket_t *bucket)
+{
+ incoming_context_t *ctx = bucket->data;
+
+ return wait_for_body(bucket, ctx);
+}
+
+static apr_status_t bwtp_incoming_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ incoming_context_t *ctx = bucket->data;
+ apr_status_t rv;
+
+ rv = wait_for_body(bucket, ctx);
+ if (rv) {
+ /* It's not possible to have read anything yet! */
+ if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) {
+ *len = 0;
+ }
+ return rv;
+ }
+
+ rv = serf_bucket_read(ctx->body, requested, data, len);
+ if (APR_STATUS_IS_EOF(rv)) {
+ ctx->state = STATE_DONE;
+ }
+ return rv;
+}
+
+static apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data, apr_size_t *len)
+{
+ incoming_context_t *ctx = bucket->data;
+ apr_status_t rv;
+
+ rv = wait_for_body(bucket, ctx);
+ if (rv) {
+ return rv;
+ }
+
+ /* Delegate to the stream bucket to do the readline. */
+ return serf_bucket_readline(ctx->body, acceptable, found, data, len);
+}
+
+/* ### need to implement */
+#define bwtp_incoming_peek NULL
+
+const serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = {
+ "BWTP-INCOMING",
+ bwtp_incoming_read,
+ bwtp_incoming_readline,
+ serf_default_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ bwtp_incoming_peek,
+ bwtp_incoming_destroy_and_data,
+};
diff --git a/buckets/chunk_buckets.c b/buckets/chunk_buckets.c
new file mode 100644
index 0000000..7f25508
--- /dev/null
+++ b/buckets/chunk_buckets.c
@@ -0,0 +1,235 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_pools.h>
+#include <apr_strings.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+
+typedef struct {
+ enum {
+ STATE_FETCH,
+ STATE_CHUNK,
+ STATE_EOF
+ } state;
+
+ apr_status_t last_status;
+
+ serf_bucket_t *chunk;
+ serf_bucket_t *stream;
+
+ char chunk_hdr[20];
+} chunk_context_t;
+
+
+serf_bucket_t *serf_bucket_chunk_create(
+ serf_bucket_t *stream, serf_bucket_alloc_t *allocator)
+{
+ chunk_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->state = STATE_FETCH;
+ ctx->chunk = serf_bucket_aggregate_create(allocator);
+ ctx->stream = stream;
+
+ return serf_bucket_create(&serf_bucket_type_chunk, allocator, ctx);
+}
+
+#define CRLF "\r\n"
+
+static apr_status_t create_chunk(serf_bucket_t *bucket)
+{
+ chunk_context_t *ctx = bucket->data;
+ serf_bucket_t *simple_bkt;
+ apr_size_t chunk_len;
+ apr_size_t stream_len;
+ struct iovec vecs[66]; /* 64 + chunk trailer + EOF trailer = 66 */
+ int vecs_read;
+ int i;
+
+ if (ctx->state != STATE_FETCH) {
+ return APR_SUCCESS;
+ }
+
+ ctx->last_status =
+ serf_bucket_read_iovec(ctx->stream, SERF_READ_ALL_AVAIL,
+ 64, vecs, &vecs_read);
+
+ if (SERF_BUCKET_READ_ERROR(ctx->last_status)) {
+ /* Uh-oh. */
+ return ctx->last_status;
+ }
+
+ /* Count the length of the data we read. */
+ stream_len = 0;
+ for (i = 0; i < vecs_read; i++) {
+ stream_len += vecs[i].iov_len;
+ }
+
+ /* assert: stream_len in hex < sizeof(ctx->chunk_hdr) */
+
+ /* Inserting a 0 byte chunk indicates a terminator, which already happens
+ * during the EOF handler below. Adding another one here will cause the
+ * EOF chunk to be interpreted by the server as a new request. So,
+ * we'll only do this if we have something to write.
+ */
+ if (stream_len) {
+ /* Build the chunk header. */
+ chunk_len = apr_snprintf(ctx->chunk_hdr, sizeof(ctx->chunk_hdr),
+ "%" APR_UINT64_T_HEX_FMT CRLF,
+ (apr_uint64_t)stream_len);
+
+ /* Create a copy of the chunk header so we can have multiple chunks
+ * in the pipeline at the same time.
+ */
+ simple_bkt = serf_bucket_simple_copy_create(ctx->chunk_hdr, chunk_len,
+ bucket->allocator);
+ serf_bucket_aggregate_append(ctx->chunk, simple_bkt);
+
+ /* Insert the chunk footer. */
+ vecs[vecs_read].iov_base = CRLF;
+ vecs[vecs_read++].iov_len = sizeof(CRLF) - 1;
+ }
+
+ /* We've reached the end of the line for the stream. */
+ if (APR_STATUS_IS_EOF(ctx->last_status)) {
+ /* Insert the chunk footer. */
+ vecs[vecs_read].iov_base = "0" CRLF CRLF;
+ vecs[vecs_read++].iov_len = sizeof("0" CRLF CRLF) - 1;
+
+ ctx->state = STATE_EOF;
+ }
+ else {
+ /* Okay, we can return data. */
+ ctx->state = STATE_CHUNK;
+ }
+
+ serf_bucket_aggregate_append_iovec(ctx->chunk, vecs, vecs_read);
+
+ return APR_SUCCESS;
+}
+
+static apr_status_t serf_chunk_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ chunk_context_t *ctx = bucket->data;
+ apr_status_t status;
+
+ /* Before proceeding, we need to fetch some data from the stream. */
+ if (ctx->state == STATE_FETCH) {
+ status = create_chunk(bucket);
+ if (status) {
+ return status;
+ }
+ }
+
+ status = serf_bucket_read(ctx->chunk, requested, data, len);
+
+ /* Mask EOF from aggregate bucket. */
+ if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) {
+ status = ctx->last_status;
+ ctx->state = STATE_FETCH;
+ }
+
+ return status;
+}
+
+static apr_status_t serf_chunk_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data, apr_size_t *len)
+{
+ chunk_context_t *ctx = bucket->data;
+ apr_status_t status;
+
+ status = serf_bucket_readline(ctx->chunk, acceptable, found, data, len);
+
+ /* Mask EOF from aggregate bucket. */
+ if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) {
+ status = APR_EAGAIN;
+ ctx->state = STATE_FETCH;
+ }
+
+ return status;
+}
+
+static apr_status_t serf_chunk_read_iovec(serf_bucket_t *bucket,
+ apr_size_t requested,
+ int vecs_size,
+ struct iovec *vecs,
+ int *vecs_used)
+{
+ chunk_context_t *ctx = bucket->data;
+ apr_status_t status;
+
+ /* Before proceeding, we need to fetch some data from the stream. */
+ if (ctx->state == STATE_FETCH) {
+ status = create_chunk(bucket);
+ if (status) {
+ return status;
+ }
+ }
+
+ status = serf_bucket_read_iovec(ctx->chunk, requested, vecs_size, vecs,
+ vecs_used);
+
+ /* Mask EOF from aggregate bucket. */
+ if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) {
+ status = ctx->last_status;
+ ctx->state = STATE_FETCH;
+ }
+
+ return status;
+}
+
+static apr_status_t serf_chunk_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ chunk_context_t *ctx = bucket->data;
+ apr_status_t status;
+
+ status = serf_bucket_peek(ctx->chunk, data, len);
+
+ /* Mask EOF from aggregate bucket. */
+ if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) {
+ status = APR_EAGAIN;
+ }
+
+ return status;
+}
+
+static void serf_chunk_destroy(serf_bucket_t *bucket)
+{
+ chunk_context_t *ctx = bucket->data;
+
+ serf_bucket_destroy(ctx->stream);
+ serf_bucket_destroy(ctx->chunk);
+
+ serf_default_destroy_and_data(bucket);
+}
+
+const serf_bucket_type_t serf_bucket_type_chunk = {
+ "CHUNK",
+ serf_chunk_read,
+ serf_chunk_readline,
+ serf_chunk_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_chunk_peek,
+ serf_chunk_destroy,
+};
diff --git a/buckets/dechunk_buckets.c b/buckets/dechunk_buckets.c
new file mode 100644
index 0000000..28dd671
--- /dev/null
+++ b/buckets/dechunk_buckets.c
@@ -0,0 +1,185 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_strings.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+typedef struct {
+ serf_bucket_t *stream;
+
+ enum {
+ STATE_SIZE, /* reading the chunk size */
+ STATE_CHUNK, /* reading the chunk */
+ STATE_TERM, /* reading the chunk terminator */
+ STATE_DONE /* body is done; we've returned EOF */
+ } state;
+
+ /* Buffer for accumulating a chunk size. */
+ serf_linebuf_t linebuf;
+
+ /* How much of the chunk, or the terminator, do we have left to read? */
+ apr_int64_t body_left;
+
+} dechunk_context_t;
+
+
+serf_bucket_t *serf_bucket_dechunk_create(
+ serf_bucket_t *stream,
+ serf_bucket_alloc_t *allocator)
+{
+ dechunk_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->stream = stream;
+ ctx->state = STATE_SIZE;
+
+ serf_linebuf_init(&ctx->linebuf);
+
+ return serf_bucket_create(&serf_bucket_type_dechunk, allocator, ctx);
+}
+
+static void serf_dechunk_destroy_and_data(serf_bucket_t *bucket)
+{
+ dechunk_context_t *ctx = bucket->data;
+
+ serf_bucket_destroy(ctx->stream);
+
+ serf_default_destroy_and_data(bucket);
+}
+
+static apr_status_t serf_dechunk_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ dechunk_context_t *ctx = bucket->data;
+ apr_status_t status;
+
+ while (1) {
+ switch (ctx->state) {
+ case STATE_SIZE:
+
+ /* fetch a line terminated by CRLF */
+ status = serf_linebuf_fetch(&ctx->linebuf, ctx->stream,
+ SERF_NEWLINE_CRLF);
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+
+ /* if a line was read, then parse it. */
+ if (ctx->linebuf.state == SERF_LINEBUF_READY) {
+ /* NUL-terminate the line. if it filled the entire buffer,
+ then just assume the thing is too large. */
+ if (ctx->linebuf.used == sizeof(ctx->linebuf.line))
+ return APR_FROM_OS_ERROR(ERANGE);
+ ctx->linebuf.line[ctx->linebuf.used] = '\0';
+
+ /* convert from HEX digits. */
+ ctx->body_left = apr_strtoi64(ctx->linebuf.line, NULL, 16);
+ if (errno == ERANGE) {
+ return APR_FROM_OS_ERROR(ERANGE);
+ }
+
+ if (ctx->body_left == 0) {
+ /* Just read the last-chunk marker. We're DONE. */
+ ctx->state = STATE_DONE;
+ status = APR_EOF;
+ }
+ else {
+ /* Got a size, so we'll start reading the chunk now. */
+ ctx->state = STATE_CHUNK;
+ }
+
+ /* If we can read more, then go do so. */
+ if (!status)
+ continue;
+ }
+ /* assert: status != 0 */
+
+ /* Note that we didn't actually read anything, so our callers
+ * don't get confused.
+ */
+ *len = 0;
+
+ return status;
+
+ case STATE_CHUNK:
+
+ if (requested > ctx->body_left) {
+ requested = ctx->body_left;
+ }
+
+ /* Delegate to the stream bucket to do the read. */
+ status = serf_bucket_read(ctx->stream, requested, data, len);
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+
+ /* Some data was read, so decrement the amount left and see
+ * if we're done reading this chunk.
+ */
+ ctx->body_left -= *len;
+ if (!ctx->body_left) {
+ ctx->state = STATE_TERM;
+ ctx->body_left = 2; /* CRLF */
+ }
+
+ /* Return the data we just read. */
+ return status;
+
+ case STATE_TERM:
+ /* Delegate to the stream bucket to do the read. */
+ status = serf_bucket_read(ctx->stream, ctx->body_left, data, len);
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+
+ /* Some data was read, so decrement the amount left and see
+ * if we're done reading the chunk terminator.
+ */
+ ctx->body_left -= *len;
+ if (!ctx->body_left) {
+ ctx->state = STATE_SIZE;
+ }
+
+ if (status)
+ return status;
+
+ break;
+
+ case STATE_DONE:
+ /* Just keep returning EOF */
+ return APR_EOF;
+
+ default:
+ /* Not reachable */
+ return APR_EGENERAL;
+ }
+ }
+ /* NOTREACHED */
+}
+
+/* ### need to implement */
+#define serf_dechunk_readline NULL
+#define serf_dechunk_peek NULL
+
+const serf_bucket_type_t serf_bucket_type_dechunk = {
+ "DECHUNK",
+ serf_dechunk_read,
+ serf_dechunk_readline,
+ serf_default_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_dechunk_peek,
+ serf_dechunk_destroy_and_data,
+};
diff --git a/buckets/deflate_buckets.c b/buckets/deflate_buckets.c
new file mode 100644
index 0000000..7a8e8e4
--- /dev/null
+++ b/buckets/deflate_buckets.c
@@ -0,0 +1,384 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_strings.h>
+
+#include <zlib.h>
+
+/* This conditional isn't defined anywhere yet. */
+#ifdef HAVE_ZUTIL_H
+#include <zutil.h>
+#endif
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+/* magic header */
+static char deflate_magic[2] = { '\037', '\213' };
+#define DEFLATE_MAGIC_SIZE 10
+#define DEFLATE_VERIFY_SIZE 8
+#define DEFLATE_BUFFER_SIZE 8096
+
+static const int DEFLATE_WINDOW_SIZE = -15;
+static const int DEFLATE_MEMLEVEL = 9;
+
+typedef struct {
+ serf_bucket_t *stream;
+ serf_bucket_t *inflate_stream;
+
+ int format; /* Are we 'deflate' or 'gzip'? */
+
+ enum {
+ STATE_READING_HEADER, /* reading the gzip header */
+ STATE_HEADER, /* read the gzip header */
+ STATE_INIT, /* init'ing zlib functions */
+ STATE_INFLATE, /* inflating the content now */
+ STATE_READING_VERIFY, /* reading the final gzip CRC */
+ STATE_VERIFY, /* verifying the final gzip CRC */
+ STATE_FINISH, /* clean up after reading body */
+ STATE_DONE, /* body is done; we'll return EOF here */
+ } state;
+
+ z_stream zstream;
+ char hdr_buffer[DEFLATE_MAGIC_SIZE];
+ unsigned char buffer[DEFLATE_BUFFER_SIZE];
+ unsigned long crc;
+ int windowSize;
+ int memLevel;
+ int bufferSize;
+
+ /* How much of the chunk, or the terminator, do we have left to read? */
+ apr_size_t stream_left;
+
+ /* How much are we supposed to read? */
+ apr_size_t stream_size;
+
+ int stream_status; /* What was the last status we read? */
+
+} deflate_context_t;
+
+/* Inputs a string and returns a long. */
+static unsigned long getLong(unsigned char *string)
+{
+ return ((unsigned long)string[0])
+ | (((unsigned long)string[1]) << 8)
+ | (((unsigned long)string[2]) << 16)
+ | (((unsigned long)string[3]) << 24);
+}
+
+serf_bucket_t *serf_bucket_deflate_create(
+ serf_bucket_t *stream,
+ serf_bucket_alloc_t *allocator,
+ int format)
+{
+ deflate_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->stream = stream;
+ ctx->stream_status = APR_SUCCESS;
+ ctx->inflate_stream = serf_bucket_aggregate_create(allocator);
+ ctx->format = format;
+ ctx->crc = 0;
+ /* zstream must be NULL'd out. */
+ memset(&ctx->zstream, 0, sizeof(ctx->zstream));
+
+ switch (ctx->format) {
+ case SERF_DEFLATE_GZIP:
+ ctx->state = STATE_READING_HEADER;
+ break;
+ case SERF_DEFLATE_DEFLATE:
+ /* deflate doesn't have a header. */
+ ctx->state = STATE_INIT;
+ break;
+ default:
+ /* Not reachable */
+ return NULL;
+ }
+
+ /* Initial size of gzip header. */
+ ctx->stream_left = ctx->stream_size = DEFLATE_MAGIC_SIZE;
+
+ ctx->windowSize = DEFLATE_WINDOW_SIZE;
+ ctx->memLevel = DEFLATE_MEMLEVEL;
+ ctx->bufferSize = DEFLATE_BUFFER_SIZE;
+
+ return serf_bucket_create(&serf_bucket_type_deflate, allocator, ctx);
+}
+
+static void serf_deflate_destroy_and_data(serf_bucket_t *bucket)
+{
+ deflate_context_t *ctx = bucket->data;
+
+ if (ctx->state > STATE_INIT &&
+ ctx->state <= STATE_FINISH)
+ inflateEnd(&ctx->zstream);
+
+ /* We may have appended inflate_stream into the stream bucket.
+ * If so, avoid free'ing it twice.
+ */
+ if (ctx->inflate_stream) {
+ serf_bucket_destroy(ctx->inflate_stream);
+ }
+ serf_bucket_destroy(ctx->stream);
+
+ serf_default_destroy_and_data(bucket);
+}
+
+static apr_status_t serf_deflate_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ deflate_context_t *ctx = bucket->data;
+ unsigned long compCRC, compLen;
+ apr_status_t status;
+ const char *private_data;
+ apr_size_t private_len;
+ int zRC;
+
+ while (1) {
+ switch (ctx->state) {
+ case STATE_READING_HEADER:
+ case STATE_READING_VERIFY:
+ status = serf_bucket_read(ctx->stream, ctx->stream_left,
+ &private_data, &private_len);
+
+ if (SERF_BUCKET_READ_ERROR(status)) {
+ return status;
+ }
+
+ memcpy(ctx->hdr_buffer + (ctx->stream_size - ctx->stream_left),
+ private_data, private_len);
+
+ ctx->stream_left -= private_len;
+
+ if (ctx->stream_left == 0) {
+ ctx->state++;
+ if (APR_STATUS_IS_EAGAIN(status)) {
+ *len = 0;
+ return status;
+ }
+ }
+ else if (status) {
+ *len = 0;
+ return status;
+ }
+ break;
+ case STATE_HEADER:
+ if (ctx->hdr_buffer[0] != deflate_magic[0] ||
+ ctx->hdr_buffer[1] != deflate_magic[1]) {
+ return SERF_ERROR_DECOMPRESSION_FAILED;
+ }
+ if (ctx->hdr_buffer[3] != 0) {
+ return SERF_ERROR_DECOMPRESSION_FAILED;
+ }
+ ctx->state++;
+ break;
+ case STATE_VERIFY:
+ /* Do the checksum computation. */
+ compCRC = getLong((unsigned char*)ctx->hdr_buffer);
+ if (ctx->crc != compCRC) {
+ return SERF_ERROR_DECOMPRESSION_FAILED;
+ }
+ compLen = getLong((unsigned char*)ctx->hdr_buffer + 4);
+ if (ctx->zstream.total_out != compLen) {
+ return SERF_ERROR_DECOMPRESSION_FAILED;
+ }
+ ctx->state++;
+ break;
+ case STATE_INIT:
+ zRC = inflateInit2(&ctx->zstream, ctx->windowSize);
+ if (zRC != Z_OK) {
+ return SERF_ERROR_DECOMPRESSION_FAILED;
+ }
+ ctx->zstream.next_out = ctx->buffer;
+ ctx->zstream.avail_out = ctx->bufferSize;
+ ctx->state++;
+ break;
+ case STATE_FINISH:
+ inflateEnd(&ctx->zstream);
+ serf_bucket_aggregate_prepend(ctx->stream, ctx->inflate_stream);
+ ctx->inflate_stream = 0;
+ ctx->state++;
+ break;
+ case STATE_INFLATE:
+ /* Do we have anything already uncompressed to read? */
+ status = serf_bucket_read(ctx->inflate_stream, requested, data,
+ len);
+ if (SERF_BUCKET_READ_ERROR(status)) {
+ return status;
+ }
+ /* Hide EOF. */
+ if (APR_STATUS_IS_EOF(status)) {
+ status = ctx->stream_status;
+ if (APR_STATUS_IS_EOF(status)) {
+ /* We've read all of the data from our stream, but we
+ * need to continue to iterate until we flush
+ * out the zlib buffer.
+ */
+ status = APR_SUCCESS;
+ }
+ }
+ if (*len != 0) {
+ return status;
+ }
+
+ /* We tried; but we have nothing buffered. Fetch more. */
+
+ /* It is possible that we maxed out avail_out before
+ * exhausting avail_in; therefore, continue using the
+ * previous buffer. Otherwise, fetch more data from
+ * our stream bucket.
+ */
+ if (ctx->zstream.avail_in == 0) {
+ /* When we empty our inflated stream, we'll return this
+ * status - this allow us to eventually pass up EAGAINs.
+ */
+ ctx->stream_status = serf_bucket_read(ctx->stream,
+ ctx->bufferSize,
+ &private_data,
+ &private_len);
+
+ if (SERF_BUCKET_READ_ERROR(ctx->stream_status)) {
+ return ctx->stream_status;
+ }
+
+ if (!private_len && APR_STATUS_IS_EAGAIN(ctx->stream_status)) {
+ *len = 0;
+ status = ctx->stream_status;
+ ctx->stream_status = APR_SUCCESS;
+ return status;
+ }
+
+ ctx->zstream.next_in = (unsigned char*)private_data;
+ ctx->zstream.avail_in = private_len;
+ }
+ zRC = Z_OK;
+ while (ctx->zstream.avail_in != 0) {
+ /* We're full, clear out our buffer, reset, and return. */
+ if (ctx->zstream.avail_out == 0) {
+ serf_bucket_t *tmp;
+ ctx->zstream.next_out = ctx->buffer;
+ private_len = ctx->bufferSize - ctx->zstream.avail_out;
+
+ ctx->crc = crc32(ctx->crc, (const Bytef *)ctx->buffer,
+ private_len);
+
+ /* FIXME: There probably needs to be a free func. */
+ tmp = SERF_BUCKET_SIMPLE_STRING_LEN((char *)ctx->buffer,
+ private_len,
+ bucket->allocator);
+ serf_bucket_aggregate_append(ctx->inflate_stream, tmp);
+ ctx->zstream.avail_out = ctx->bufferSize;
+ break;
+ }
+ zRC = inflate(&ctx->zstream, Z_NO_FLUSH);
+
+ if (zRC == Z_STREAM_END) {
+ serf_bucket_t *tmp;
+
+ private_len = ctx->bufferSize - ctx->zstream.avail_out;
+ ctx->crc = crc32(ctx->crc, (const Bytef *)ctx->buffer,
+ private_len);
+ /* FIXME: There probably needs to be a free func. */
+ tmp = SERF_BUCKET_SIMPLE_STRING_LEN((char *)ctx->buffer,
+ private_len,
+ bucket->allocator);
+ serf_bucket_aggregate_append(ctx->inflate_stream, tmp);
+
+ ctx->zstream.avail_out = ctx->bufferSize;
+
+ /* Push back the remaining data to be read. */
+ tmp = serf_bucket_aggregate_create(bucket->allocator);
+ serf_bucket_aggregate_prepend(tmp, ctx->stream);
+ ctx->stream = tmp;
+
+ /* We now need to take the remaining avail_in and
+ * throw it in ctx->stream so our next read picks it up.
+ */
+ tmp = SERF_BUCKET_SIMPLE_STRING_LEN(
+ (const char*)ctx->zstream.next_in,
+ ctx->zstream.avail_in,
+ bucket->allocator);
+ serf_bucket_aggregate_prepend(ctx->stream, tmp);
+
+ switch (ctx->format) {
+ case SERF_DEFLATE_GZIP:
+ ctx->stream_left = ctx->stream_size =
+ DEFLATE_VERIFY_SIZE;
+ ctx->state++;
+ break;
+ case SERF_DEFLATE_DEFLATE:
+ /* Deflate does not have a verify footer. */
+ ctx->state = STATE_FINISH;
+ break;
+ default:
+ /* Not reachable */
+ return APR_EGENERAL;
+ }
+
+ break;
+ }
+ if (zRC != Z_OK) {
+ return SERF_ERROR_DECOMPRESSION_FAILED;
+ }
+ }
+ /* Okay, we've inflated. Try to read. */
+ status = serf_bucket_read(ctx->inflate_stream, requested, data,
+ len);
+ /* Hide EOF. */
+ if (APR_STATUS_IS_EOF(status)) {
+ status = ctx->stream_status;
+ /* If our stream is finished too, return SUCCESS so
+ * we'll iterate one more time.
+ */
+ if (APR_STATUS_IS_EOF(status)) {
+ /* No more data to read from the stream, and everything
+ inflated. If all data was received correctly, state
+ should have been advanced to STATE_READING_VERIFY or
+ STATE_FINISH. If not, then the data was incomplete
+ and we have an error. */
+ if (ctx->state != STATE_INFLATE)
+ return APR_SUCCESS;
+ else
+ return SERF_ERROR_DECOMPRESSION_FAILED;
+ }
+ }
+ return status;
+ case STATE_DONE:
+ /* We're done inflating. Use our finished buffer. */
+ return serf_bucket_read(ctx->stream, requested, data, len);
+ default:
+ /* Not reachable */
+ return APR_EGENERAL;
+ }
+ }
+
+ /* NOTREACHED */
+}
+
+/* ### need to implement */
+#define serf_deflate_readline NULL
+#define serf_deflate_peek NULL
+
+const serf_bucket_type_t serf_bucket_type_deflate = {
+ "DEFLATE",
+ serf_deflate_read,
+ serf_deflate_readline,
+ serf_default_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_deflate_peek,
+ serf_deflate_destroy_and_data,
+};
diff --git a/buckets/file_buckets.c b/buckets/file_buckets.c
new file mode 100644
index 0000000..bd41cab
--- /dev/null
+++ b/buckets/file_buckets.c
@@ -0,0 +1,117 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_pools.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+typedef struct {
+ apr_file_t *file;
+
+ serf_databuf_t databuf;
+
+} file_context_t;
+
+
+static apr_status_t file_reader(void *baton, apr_size_t bufsize,
+ char *buf, apr_size_t *len)
+{
+ file_context_t *ctx = baton;
+
+ *len = bufsize;
+ return apr_file_read(ctx->file, buf, len);
+}
+
+serf_bucket_t *serf_bucket_file_create(
+ apr_file_t *file,
+ serf_bucket_alloc_t *allocator)
+{
+ file_context_t *ctx;
+#if APR_HAS_MMAP
+ apr_finfo_t finfo;
+ const char *file_path;
+
+ /* See if we'd be better off mmap'ing this file instead.
+ *
+ * Note that there is a failure case here that we purposely fall through:
+ * if a file is buffered, apr_mmap will reject it. However, on older
+ * versions of APR, we have no way of knowing this - but apr_mmap_create
+ * will check for this and return APR_EBADF.
+ */
+ apr_file_name_get(&file_path, file);
+ apr_stat(&finfo, file_path, APR_FINFO_SIZE,
+ serf_bucket_allocator_get_pool(allocator));
+ if (APR_MMAP_CANDIDATE(finfo.size)) {
+ apr_status_t status;
+ apr_mmap_t *file_mmap;
+ status = apr_mmap_create(&file_mmap, file, 0, finfo.size,
+ APR_MMAP_READ,
+ serf_bucket_allocator_get_pool(allocator));
+
+ if (status == APR_SUCCESS) {
+ return serf_bucket_mmap_create(file_mmap, allocator);
+ }
+ }
+#endif
+
+ /* Oh, well. */
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->file = file;
+
+ serf_databuf_init(&ctx->databuf);
+ ctx->databuf.read = file_reader;
+ ctx->databuf.read_baton = ctx;
+
+ return serf_bucket_create(&serf_bucket_type_file, allocator, ctx);
+}
+
+static apr_status_t serf_file_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ file_context_t *ctx = bucket->data;
+
+ return serf_databuf_read(&ctx->databuf, requested, data, len);
+}
+
+static apr_status_t serf_file_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data, apr_size_t *len)
+{
+ file_context_t *ctx = bucket->data;
+
+ return serf_databuf_readline(&ctx->databuf, acceptable, found, data, len);
+}
+
+static apr_status_t serf_file_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ file_context_t *ctx = bucket->data;
+
+ return serf_databuf_peek(&ctx->databuf, data, len);
+}
+
+const serf_bucket_type_t serf_bucket_type_file = {
+ "FILE",
+ serf_file_read,
+ serf_file_readline,
+ serf_default_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_file_peek,
+ serf_default_destroy_and_data,
+};
diff --git a/buckets/headers_buckets.c b/buckets/headers_buckets.c
new file mode 100644
index 0000000..8bf91b4
--- /dev/null
+++ b/buckets/headers_buckets.c
@@ -0,0 +1,429 @@
+/* Copyright 2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+
+#include <apr_general.h> /* for strcasecmp() */
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+
+typedef struct header_list {
+ const char *header;
+ const char *value;
+
+ apr_size_t header_size;
+ apr_size_t value_size;
+
+ int alloc_flags;
+#define ALLOC_HEADER 0x0001 /* header lives in our allocator */
+#define ALLOC_VALUE 0x0002 /* value lives in our allocator */
+
+ struct header_list *next;
+} header_list_t;
+
+typedef struct {
+ header_list_t *list;
+
+ header_list_t *cur_read;
+ enum {
+ READ_START, /* haven't started reading yet */
+ READ_HEADER, /* reading cur_read->header */
+ READ_SEP, /* reading ": " */
+ READ_VALUE, /* reading cur_read->value */
+ READ_CRLF, /* reading "\r\n" */
+ READ_TERM, /* reading the final "\r\n" */
+ READ_DONE /* no more data to read */
+ } state;
+ apr_size_t amt_read; /* how much of the current state we've read */
+
+} headers_context_t;
+
+
+serf_bucket_t *serf_bucket_headers_create(
+ serf_bucket_alloc_t *allocator)
+{
+ headers_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->list = NULL;
+ ctx->state = READ_START;
+
+ return serf_bucket_create(&serf_bucket_type_headers, allocator, ctx);
+}
+
+void serf_bucket_headers_setx(
+ serf_bucket_t *bkt,
+ const char *header, apr_size_t header_size, int header_copy,
+ const char *value, apr_size_t value_size, int value_copy)
+{
+ headers_context_t *ctx = bkt->data;
+ header_list_t *iter = ctx->list;
+ header_list_t *hdr;
+
+#if 0
+ /* ### include this? */
+ if (ctx->cur_read) {
+ /* we started reading. can't change now. */
+ abort();
+ }
+#endif
+
+ hdr = serf_bucket_mem_alloc(bkt->allocator, sizeof(*hdr));
+ hdr->header_size = header_size;
+ hdr->value_size = value_size;
+ hdr->alloc_flags = 0;
+ hdr->next = NULL;
+
+ if (header_copy) {
+ hdr->header = serf_bstrmemdup(bkt->allocator, header, header_size);
+ hdr->alloc_flags |= ALLOC_HEADER;
+ }
+ else {
+ hdr->header = header;
+ }
+
+ if (value_copy) {
+ hdr->value = serf_bstrmemdup(bkt->allocator, value, value_size);
+ hdr->alloc_flags |= ALLOC_VALUE;
+ }
+ else {
+ hdr->value = value;
+ }
+
+ /* Add the new header at the end of the list. */
+ while (iter && iter->next) {
+ iter = iter->next;
+ }
+ if (iter)
+ iter->next = hdr;
+ else
+ ctx->list = hdr;
+}
+
+void serf_bucket_headers_set(
+ serf_bucket_t *headers_bucket,
+ const char *header,
+ const char *value)
+{
+ serf_bucket_headers_setx(headers_bucket,
+ header, strlen(header), 0,
+ value, strlen(value), 1);
+}
+
+void serf_bucket_headers_setc(
+ serf_bucket_t *headers_bucket,
+ const char *header,
+ const char *value)
+{
+ serf_bucket_headers_setx(headers_bucket,
+ header, strlen(header), 1,
+ value, strlen(value), 1);
+}
+
+void serf_bucket_headers_setn(
+ serf_bucket_t *headers_bucket,
+ const char *header,
+ const char *value)
+{
+ serf_bucket_headers_setx(headers_bucket,
+ header, strlen(header), 0,
+ value, strlen(value), 0);
+}
+
+const char *serf_bucket_headers_get(
+ serf_bucket_t *headers_bucket,
+ const char *header)
+{
+ headers_context_t *ctx = headers_bucket->data;
+ header_list_t *found = ctx->list;
+ const char *val = NULL;
+ int value_size = 0;
+ int val_alloc = 0;
+
+ while (found) {
+ if (strcasecmp(found->header, header) == 0) {
+ if (val) {
+ /* The header is already present. RFC 2616, section 4.2
+ indicates that we should append the new value, separated by
+ a comma. Reasoning: for headers whose values are known to
+ be comma-separated, that is clearly the correct behavior;
+ for others, the correct behavior is undefined anyway. */
+
+ /* The "+1" is for the comma; serf_bstrmemdup() will also add
+ one slot for the terminating '\0'. */
+ apr_size_t new_size = found->value_size + value_size + 1;
+ char *new_val = serf_bucket_mem_alloc(headers_bucket->allocator,
+ new_size);
+ memcpy(new_val, val, value_size);
+ new_val[value_size] = ',';
+ memcpy(new_val + value_size + 1, found->value,
+ found->value_size);
+ new_val[new_size] = '\0';
+ /* Copy the new value over the already existing value. */
+ if (val_alloc)
+ serf_bucket_mem_free(headers_bucket->allocator, (void*)val);
+ val_alloc |= ALLOC_VALUE;
+ val = new_val;
+ value_size = new_size;
+ }
+ else {
+ val = found->value;
+ value_size = found->value_size;
+ }
+ }
+ found = found->next;
+ }
+
+ return val;
+}
+
+void serf_bucket_headers_do(
+ serf_bucket_t *headers_bucket,
+ serf_bucket_headers_do_callback_fn_t func,
+ void *baton)
+{
+ headers_context_t *ctx = headers_bucket->data;
+ header_list_t *scan = ctx->list;
+
+ while (scan) {
+ if (func(baton, scan->header, scan->value) != 0) {
+ break;
+ }
+ scan = scan->next;
+ }
+}
+
+static void serf_headers_destroy_and_data(serf_bucket_t *bucket)
+{
+ headers_context_t *ctx = bucket->data;
+ header_list_t *scan = ctx->list;
+
+ while (scan) {
+ header_list_t *next_hdr = scan->next;
+
+ if (scan->alloc_flags & ALLOC_HEADER)
+ serf_bucket_mem_free(bucket->allocator, (void *)scan->header);
+ if (scan->alloc_flags & ALLOC_VALUE)
+ serf_bucket_mem_free(bucket->allocator, (void *)scan->value);
+ serf_bucket_mem_free(bucket->allocator, scan);
+
+ scan = next_hdr;
+ }
+
+ serf_default_destroy_and_data(bucket);
+}
+
+static void select_value(
+ headers_context_t *ctx,
+ const char **value,
+ apr_size_t *len)
+{
+ const char *v;
+ apr_size_t l;
+
+ if (ctx->state == READ_START) {
+ if (ctx->list == NULL) {
+ /* No headers. Move straight to the TERM state. */
+ ctx->state = READ_TERM;
+ }
+ else {
+ ctx->state = READ_HEADER;
+ ctx->cur_read = ctx->list;
+ }
+ ctx->amt_read = 0;
+ }
+
+ switch (ctx->state) {
+ case READ_HEADER:
+ v = ctx->cur_read->header;
+ l = ctx->cur_read->header_size;
+ break;
+ case READ_SEP:
+ v = ": ";
+ l = 2;
+ break;
+ case READ_VALUE:
+ v = ctx->cur_read->value;
+ l = ctx->cur_read->value_size;
+ break;
+ case READ_CRLF:
+ case READ_TERM:
+ v = "\r\n";
+ l = 2;
+ break;
+ case READ_DONE:
+ *len = 0;
+ return;
+ default:
+ /* Not reachable */
+ return;
+ }
+
+ *value = v + ctx->amt_read;
+ *len = l - ctx->amt_read;
+}
+
+/* the current data chunk has been read/consumed. move our internal state. */
+static apr_status_t consume_chunk(headers_context_t *ctx)
+{
+ /* move to the next state, resetting the amount read. */
+ ++ctx->state;
+ ctx->amt_read = 0;
+
+ /* just sent the terminator and moved to DONE. signal completion. */
+ if (ctx->state == READ_DONE)
+ return APR_EOF;
+
+ /* end of this header. move to the next one. */
+ if (ctx->state == READ_TERM) {
+ ctx->cur_read = ctx->cur_read->next;
+ if (ctx->cur_read != NULL) {
+ /* We've got another head to send. Reset the read state. */
+ ctx->state = READ_HEADER;
+ }
+ /* else leave in READ_TERM */
+ }
+
+ /* there is more data which can be read immediately. */
+ return APR_SUCCESS;
+}
+
+static apr_status_t serf_headers_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ headers_context_t *ctx = bucket->data;
+
+ select_value(ctx, data, len);
+
+ /* already done or returning the CRLF terminator? return EOF */
+ if (ctx->state == READ_DONE || ctx->state == READ_TERM)
+ return APR_EOF;
+
+ return APR_SUCCESS;
+}
+
+static apr_status_t serf_headers_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ headers_context_t *ctx = bucket->data;
+ apr_size_t avail;
+
+ select_value(ctx, data, &avail);
+ if (ctx->state == READ_DONE)
+ return APR_EOF;
+
+ if (requested >= avail) {
+ /* return everything from this chunk */
+ *len = avail;
+
+ /* we consumed this chunk. advance the state. */
+ return consume_chunk(ctx);
+ }
+
+ /* return just the amount requested, and advance our pointer */
+ *len = requested;
+ ctx->amt_read += requested;
+
+ /* there is more that can be read immediately */
+ return APR_SUCCESS;
+}
+
+static apr_status_t serf_headers_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data, apr_size_t *len)
+{
+ headers_context_t *ctx = bucket->data;
+ apr_status_t status;
+
+ /* ### what behavior should we use here? APR_EGENERAL for now */
+ if ((acceptable & SERF_NEWLINE_CRLF) == 0)
+ return APR_EGENERAL;
+
+ /* get whatever is in this chunk */
+ select_value(ctx, data, len);
+ if (ctx->state == READ_DONE)
+ return APR_EOF;
+
+ /* we consumed this chunk. advance the state. */
+ status = consume_chunk(ctx);
+
+ /* the type of newline found is easy... */
+ *found = (ctx->state == READ_CRLF || ctx->state == READ_TERM)
+ ? SERF_NEWLINE_CRLF : SERF_NEWLINE_NONE;
+
+ return status;
+}
+
+static apr_status_t serf_headers_read_iovec(serf_bucket_t *bucket,
+ apr_size_t requested,
+ int vecs_size,
+ struct iovec *vecs,
+ int *vecs_used)
+{
+ apr_size_t avail = requested;
+ int i;
+
+ *vecs_used = 0;
+
+ for (i = 0; i < vecs_size; i++) {
+ const char *data;
+ apr_size_t len;
+ apr_status_t status;
+
+ /* Calling read() would not be a safe opt in the general case, but it
+ * is here for the header bucket as it only frees all of the header
+ * keys and values when the entire bucket goes away - not on a
+ * per-read() basis as is normally the case.
+ */
+ status = serf_headers_read(bucket, avail, &data, &len);
+
+ if (len) {
+ vecs[*vecs_used].iov_base = (char*)data;
+ vecs[*vecs_used].iov_len = len;
+
+ (*vecs_used)++;
+
+ if (avail != SERF_READ_ALL_AVAIL) {
+ avail -= len;
+
+ /* If we reach 0, then read()'s status will suffice. */
+ if (avail == 0) {
+ return status;
+ }
+ }
+ }
+
+ if (status) {
+ return status;
+ }
+ }
+
+ return APR_SUCCESS;
+}
+
+const serf_bucket_type_t serf_bucket_type_headers = {
+ "HEADERS",
+ serf_headers_read,
+ serf_headers_readline,
+ serf_headers_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_headers_peek,
+ serf_headers_destroy_and_data,
+};
diff --git a/buckets/iovec_buckets.c b/buckets/iovec_buckets.c
new file mode 100644
index 0000000..9ac1d8d
--- /dev/null
+++ b/buckets/iovec_buckets.c
@@ -0,0 +1,169 @@
+/* Copyright 2011 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_pools.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+
+typedef struct {
+ struct iovec *vecs;
+
+ /* Total number of buffer stored in the vecs var. */
+ int vecs_len;
+ /* Points to the first unread buffer. */
+ int current_vec;
+ /* First buffer offset. */
+ int offset;
+} iovec_context_t;
+
+serf_bucket_t *serf_bucket_iovec_create(
+ struct iovec vecs[],
+ int len,
+ serf_bucket_alloc_t *allocator)
+{
+ iovec_context_t *ctx;
+ int i;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->vecs = serf_bucket_mem_alloc(allocator, len * sizeof(struct iovec));
+ ctx->vecs_len = len;
+ ctx->current_vec = 0;
+ ctx->offset = 0;
+
+ /* copy all buffers to our iovec. */
+ for (i = 0; i < len; i++) {
+ ctx->vecs[i].iov_base = vecs[i].iov_base;
+ ctx->vecs[i].iov_len = vecs[i].iov_len;
+ }
+
+ return serf_bucket_create(&serf_bucket_type_iovec, allocator, ctx);
+}
+
+static apr_status_t serf_iovec_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data, apr_size_t *len)
+{
+ return APR_ENOTIMPL;
+}
+
+static apr_status_t serf_iovec_read_iovec(serf_bucket_t *bucket,
+ apr_size_t requested,
+ int vecs_size,
+ struct iovec *vecs,
+ int *vecs_used)
+{
+ iovec_context_t *ctx = bucket->data;
+
+ *vecs_used = 0;
+
+ /* copy the requested amount of buffers to the provided iovec. */
+ for (; ctx->current_vec < ctx->vecs_len; ctx->current_vec++) {
+ struct iovec vec = ctx->vecs[ctx->current_vec];
+ apr_size_t remaining;
+
+ if (requested != SERF_READ_ALL_AVAIL && requested <= 0)
+ break;
+ if (*vecs_used >= vecs_size)
+ break;
+
+ vecs[*vecs_used].iov_base = (char*)vec.iov_base + ctx->offset;
+ remaining = vec.iov_len - ctx->offset;
+
+ /* Less bytes requested than remaining in the current buffer. */
+ if (requested != SERF_READ_ALL_AVAIL && requested < remaining) {
+ vecs[*vecs_used].iov_len = requested;
+ ctx->offset += requested;
+ requested = 0;
+ (*vecs_used)++;
+ break;
+ } else {
+ /* Copy the complete buffer. */
+ vecs[*vecs_used].iov_len = remaining;
+ ctx->offset = 0;
+ if (requested != SERF_READ_ALL_AVAIL)
+ requested -= remaining;
+ (*vecs_used)++;
+ }
+ }
+
+ if (ctx->current_vec == ctx->vecs_len && !ctx->offset)
+ return APR_EOF;
+
+ return APR_SUCCESS;
+}
+
+static apr_status_t serf_iovec_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ struct iovec vec[1];
+ apr_status_t status;
+ int vecs_used;
+
+ status = serf_iovec_read_iovec(bucket, requested, 1, vec, &vecs_used);
+
+ if (vecs_used) {
+ *data = vec[0].iov_base;
+ *len = vec[0].iov_len;
+ } else {
+ *len = 0;
+ }
+
+ return status;
+}
+
+static apr_status_t serf_iovec_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ iovec_context_t *ctx = bucket->data;
+
+ if (ctx->current_vec >= ctx->vecs_len) {
+ *len = 0;
+ return APR_EOF;
+ }
+
+ /* Return the first unread buffer, don't bother combining all
+ remaining data. */
+ *data = ctx->vecs[ctx->current_vec].iov_base;
+ *len = ctx->vecs[ctx->current_vec].iov_len;
+
+ if (ctx->current_vec + 1 == ctx->vecs_len)
+ return APR_EOF;
+
+ return APR_SUCCESS;
+}
+
+static void serf_iovec_destroy(serf_bucket_t *bucket)
+{
+ iovec_context_t *ctx = bucket->data;
+
+ serf_bucket_mem_free(bucket->allocator, ctx->vecs);
+ serf_default_destroy_and_data(bucket);
+}
+
+
+const serf_bucket_type_t serf_bucket_type_iovec = {
+ "IOVEC",
+ serf_iovec_read,
+ serf_iovec_readline,
+ serf_iovec_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_iovec_peek,
+ serf_iovec_destroy,
+};
diff --git a/buckets/limit_buckets.c b/buckets/limit_buckets.c
new file mode 100644
index 0000000..d2e6166
--- /dev/null
+++ b/buckets/limit_buckets.c
@@ -0,0 +1,134 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_pools.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+/* Older versions of APR do not have this macro. */
+#ifdef APR_SIZE_MAX
+#define REQUESTED_MAX APR_SIZE_MAX
+#else
+#define REQUESTED_MAX (~((apr_size_t)0))
+#endif
+
+
+typedef struct {
+ serf_bucket_t *stream;
+ apr_uint64_t remaining;
+} limit_context_t;
+
+
+serf_bucket_t *serf_bucket_limit_create(
+ serf_bucket_t *stream, apr_uint64_t len, serf_bucket_alloc_t *allocator)
+{
+ limit_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->stream = stream;
+ ctx->remaining = len;
+
+ return serf_bucket_create(&serf_bucket_type_limit, allocator, ctx);
+}
+
+static apr_status_t serf_limit_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ limit_context_t *ctx = bucket->data;
+ apr_status_t status;
+
+ if (!ctx->remaining) {
+ *len = 0;
+ return APR_EOF;
+ }
+
+ if (requested == SERF_READ_ALL_AVAIL || requested > ctx->remaining) {
+ if (ctx->remaining <= REQUESTED_MAX) {
+ requested = (apr_size_t) ctx->remaining;
+ } else {
+ requested = REQUESTED_MAX;
+ }
+ }
+
+ status = serf_bucket_read(ctx->stream, requested, data, len);
+
+ if (!SERF_BUCKET_READ_ERROR(status)) {
+ ctx->remaining -= *len;
+ }
+
+ /* If we have met our limit and don't have a status, return EOF. */
+ if (!ctx->remaining && !status) {
+ status = APR_EOF;
+ }
+
+ return status;
+}
+
+static apr_status_t serf_limit_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data, apr_size_t *len)
+{
+ limit_context_t *ctx = bucket->data;
+ apr_status_t status;
+
+ if (!ctx->remaining) {
+ *len = 0;
+ return APR_EOF;
+ }
+
+ status = serf_bucket_readline(ctx->stream, acceptable, found, data, len);
+
+ if (!SERF_BUCKET_READ_ERROR(status)) {
+ ctx->remaining -= *len;
+ }
+
+ /* If we have met our limit and don't have a status, return EOF. */
+ if (!ctx->remaining && !status) {
+ status = APR_EOF;
+ }
+
+ return status;
+}
+
+static apr_status_t serf_limit_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ limit_context_t *ctx = bucket->data;
+
+ return serf_bucket_peek(ctx->stream, data, len);
+}
+
+static void serf_limit_destroy(serf_bucket_t *bucket)
+{
+ limit_context_t *ctx = bucket->data;
+
+ serf_bucket_destroy(ctx->stream);
+
+ serf_default_destroy_and_data(bucket);
+}
+
+const serf_bucket_type_t serf_bucket_type_limit = {
+ "LIMIT",
+ serf_limit_read,
+ serf_limit_readline,
+ serf_default_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_limit_peek,
+ serf_limit_destroy,
+};
diff --git a/buckets/mmap_buckets.c b/buckets/mmap_buckets.c
new file mode 100644
index 0000000..f55a76b
--- /dev/null
+++ b/buckets/mmap_buckets.c
@@ -0,0 +1,118 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_pools.h>
+#include <apr_mmap.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+
+typedef struct {
+ apr_mmap_t *mmap;
+ void *current;
+ apr_off_t offset;
+ apr_off_t remaining;
+} mmap_context_t;
+
+
+serf_bucket_t *serf_bucket_mmap_create(
+ apr_mmap_t *file_mmap,
+ serf_bucket_alloc_t *allocator)
+{
+ mmap_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->mmap = file_mmap;
+ ctx->current = NULL;
+ ctx->offset = 0;
+ ctx->remaining = ctx->mmap->size;
+
+ return serf_bucket_create(&serf_bucket_type_mmap, allocator, ctx);
+}
+
+static apr_status_t serf_mmap_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ mmap_context_t *ctx = bucket->data;
+
+ if (requested == SERF_READ_ALL_AVAIL || requested > ctx->remaining) {
+ *len = ctx->remaining;
+ }
+ else {
+ *len = requested;
+ }
+
+ /* ### Would it be faster to call this once and do the offset ourselves? */
+ apr_mmap_offset((void**)data, ctx->mmap, ctx->offset);
+
+ /* For the next read... */
+ ctx->offset += *len;
+ ctx->remaining -= *len;
+
+ if (ctx->remaining == 0) {
+ return APR_EOF;
+ }
+ return APR_SUCCESS;
+}
+
+static apr_status_t serf_mmap_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data, apr_size_t *len)
+{
+ mmap_context_t *ctx = bucket->data;
+ const char *end;
+
+ /* ### Would it be faster to call this once and do the offset ourselves? */
+ apr_mmap_offset((void**)data, ctx->mmap, ctx->offset);
+ end = *data;
+
+ /* XXX An overflow is generated if we pass &ctx->remaining to readline.
+ * Not real clear why.
+ */
+ *len = ctx->remaining;
+
+ serf_util_readline(&end, len, acceptable, found);
+
+ *len = end - *data;
+
+ ctx->offset += *len;
+ ctx->remaining -= *len;
+
+ if (ctx->remaining == 0) {
+ return APR_EOF;
+ }
+ return APR_SUCCESS;
+}
+
+static apr_status_t serf_mmap_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ /* Oh, bah. */
+ return APR_ENOTIMPL;
+}
+
+const serf_bucket_type_t serf_bucket_type_mmap = {
+ "MMAP",
+ serf_mmap_read,
+ serf_mmap_readline,
+ serf_default_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_mmap_peek,
+ serf_default_destroy_and_data,
+};
diff --git a/buckets/request_buckets.c b/buckets/request_buckets.c
new file mode 100644
index 0000000..be010c0
--- /dev/null
+++ b/buckets/request_buckets.c
@@ -0,0 +1,228 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_pools.h>
+#include <apr_strings.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+
+typedef struct {
+ const char *method;
+ const char *uri;
+ serf_bucket_t *headers;
+ serf_bucket_t *body;
+ apr_int64_t len;
+} request_context_t;
+
+#define LENGTH_UNKNOWN ((apr_int64_t)-1)
+
+
+serf_bucket_t *serf_bucket_request_create(
+ const char *method,
+ const char *URI,
+ serf_bucket_t *body,
+ serf_bucket_alloc_t *allocator)
+{
+ request_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->method = method;
+ ctx->uri = URI;
+ ctx->headers = serf_bucket_headers_create(allocator);
+ ctx->body = body;
+ ctx->len = LENGTH_UNKNOWN;
+
+ return serf_bucket_create(&serf_bucket_type_request, allocator, ctx);
+}
+
+void serf_bucket_request_set_CL(
+ serf_bucket_t *bucket,
+ apr_int64_t len)
+{
+ request_context_t *ctx = (request_context_t *)bucket->data;
+
+ ctx->len = len;
+}
+
+serf_bucket_t *serf_bucket_request_get_headers(
+ serf_bucket_t *bucket)
+{
+ return ((request_context_t *)bucket->data)->headers;
+}
+
+void serf_bucket_request_set_root(
+ serf_bucket_t *bucket,
+ const char *root_url)
+{
+ request_context_t *ctx = (request_context_t *)bucket->data;
+
+ /* If uri is already absolute, don't change it. */
+ if (ctx->uri[0] != '/')
+ return;
+
+ /* If uri is '/' replace it with root_url. */
+ if (ctx->uri[1] == '\0')
+ ctx->uri = root_url;
+ else
+ ctx->uri =
+ apr_pstrcat(serf_bucket_allocator_get_pool(bucket->allocator),
+ root_url,
+ ctx->uri,
+ NULL);
+}
+
+static void serialize_data(serf_bucket_t *bucket)
+{
+ request_context_t *ctx = bucket->data;
+ serf_bucket_t *new_bucket;
+ const char *new_data;
+ struct iovec iov[4];
+ apr_size_t nbytes;
+
+ /* Serialize the request-line and headers into one mother string,
+ * and wrap a bucket around it.
+ */
+ iov[0].iov_base = (char*)ctx->method;
+ iov[0].iov_len = strlen(ctx->method);
+ iov[1].iov_base = " ";
+ iov[1].iov_len = sizeof(" ") - 1;
+ iov[2].iov_base = (char*)ctx->uri;
+ iov[2].iov_len = strlen(ctx->uri);
+ iov[3].iov_base = " HTTP/1.1\r\n";
+ iov[3].iov_len = sizeof(" HTTP/1.1\r\n") - 1;
+
+ /* ### pool allocation! */
+ new_data = apr_pstrcatv(serf_bucket_allocator_get_pool(bucket->allocator),
+ iov, 4, &nbytes);
+
+ /* Create a new bucket for this string. A free function isn't needed
+ * since the string is residing in a pool.
+ */
+ new_bucket = SERF_BUCKET_SIMPLE_STRING_LEN(new_data, nbytes,
+ bucket->allocator);
+
+ /* Build up the new bucket structure.
+ *
+ * Note that self needs to become an aggregate bucket so that a
+ * pointer to self still represents the "right" data.
+ */
+ serf_bucket_aggregate_become(bucket);
+
+ /* Insert the two buckets. */
+ serf_bucket_aggregate_append(bucket, new_bucket);
+ serf_bucket_aggregate_append(bucket, ctx->headers);
+
+ /* If we know the length, then use C-L and the raw body. Otherwise,
+ use chunked encoding for the request. */
+ if (ctx->len != LENGTH_UNKNOWN) {
+ char buf[30];
+ sprintf(buf, "%" APR_INT64_T_FMT, ctx->len);
+ serf_bucket_headers_set(ctx->headers, "Content-Length", buf);
+ if (ctx->body != NULL)
+ serf_bucket_aggregate_append(bucket, ctx->body);
+ }
+ else if (ctx->body != NULL) {
+ /* Morph the body bucket to a chunked encoding bucket for now. */
+ serf_bucket_headers_setn(ctx->headers, "Transfer-Encoding", "chunked");
+ ctx->body = serf_bucket_chunk_create(ctx->body, bucket->allocator);
+ serf_bucket_aggregate_append(bucket, ctx->body);
+ }
+
+ /* Our private context is no longer needed, and is not referred to by
+ * any existing bucket. Toss it.
+ */
+ serf_bucket_mem_free(bucket->allocator, ctx);
+}
+
+static apr_status_t serf_request_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ /* Seralize our private data into a new aggregate bucket. */
+ serialize_data(bucket);
+
+ /* Delegate to the "new" aggregate bucket to do the read. */
+ return serf_bucket_read(bucket, requested, data, len);
+}
+
+static apr_status_t serf_request_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data, apr_size_t *len)
+{
+ /* Seralize our private data into a new aggregate bucket. */
+ serialize_data(bucket);
+
+ /* Delegate to the "new" aggregate bucket to do the readline. */
+ return serf_bucket_readline(bucket, acceptable, found, data, len);
+}
+
+static apr_status_t serf_request_read_iovec(serf_bucket_t *bucket,
+ apr_size_t requested,
+ int vecs_size,
+ struct iovec *vecs,
+ int *vecs_used)
+{
+ /* Seralize our private data into a new aggregate bucket. */
+ serialize_data(bucket);
+
+ /* Delegate to the "new" aggregate bucket to do the read. */
+ return serf_bucket_read_iovec(bucket, requested,
+ vecs_size, vecs, vecs_used);
+}
+
+static apr_status_t serf_request_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ /* Seralize our private data into a new aggregate bucket. */
+ serialize_data(bucket);
+
+ /* Delegate to the "new" aggregate bucket to do the peek. */
+ return serf_bucket_peek(bucket, data, len);
+}
+
+void serf_bucket_request_become(
+ serf_bucket_t *bucket,
+ const char *method,
+ const char *uri,
+ serf_bucket_t *body)
+{
+ request_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(bucket->allocator, sizeof(*ctx));
+ ctx->method = method;
+ ctx->uri = uri;
+ ctx->headers = serf_bucket_headers_create(bucket->allocator);
+ ctx->body = body;
+
+ bucket->type = &serf_bucket_type_request;
+ bucket->data = ctx;
+
+ /* The allocator remains the same. */
+}
+
+const serf_bucket_type_t serf_bucket_type_request = {
+ "REQUEST",
+ serf_request_read,
+ serf_request_readline,
+ serf_request_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_request_peek,
+ serf_default_destroy_and_data,
+};
+
diff --git a/buckets/response_buckets.c b/buckets/response_buckets.c
new file mode 100644
index 0000000..975fedc
--- /dev/null
+++ b/buckets/response_buckets.c
@@ -0,0 +1,429 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_lib.h>
+#include <apr_strings.h>
+#include <apr_date.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+
+typedef struct {
+ serf_bucket_t *stream;
+ serf_bucket_t *body; /* Pointer to the stream wrapping the body. */
+ serf_bucket_t *headers; /* holds parsed headers */
+
+ enum {
+ STATE_STATUS_LINE, /* reading status line */
+ STATE_HEADERS, /* reading headers */
+ STATE_BODY, /* reading body */
+ STATE_TRAILERS, /* reading trailers */
+ STATE_DONE /* we've sent EOF */
+ } state;
+
+ /* Buffer for accumulating a line from the response. */
+ serf_linebuf_t linebuf;
+
+ serf_status_line sl;
+
+ int chunked; /* Do we need to read trailers? */
+ int head_req; /* Was this a HEAD request? */
+} response_context_t;
+
+
+serf_bucket_t *serf_bucket_response_create(
+ serf_bucket_t *stream,
+ serf_bucket_alloc_t *allocator)
+{
+ response_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->stream = stream;
+ ctx->body = NULL;
+ ctx->headers = serf_bucket_headers_create(allocator);
+ ctx->state = STATE_STATUS_LINE;
+ ctx->chunked = 0;
+ ctx->head_req = 0;
+
+ serf_linebuf_init(&ctx->linebuf);
+
+ return serf_bucket_create(&serf_bucket_type_response, allocator, ctx);
+}
+
+void serf_bucket_response_set_head(
+ serf_bucket_t *bucket)
+{
+ response_context_t *ctx = bucket->data;
+
+ ctx->head_req = 1;
+}
+
+serf_bucket_t *serf_bucket_response_get_headers(
+ serf_bucket_t *bucket)
+{
+ return ((response_context_t *)bucket->data)->headers;
+}
+
+
+static void serf_response_destroy_and_data(serf_bucket_t *bucket)
+{
+ response_context_t *ctx = bucket->data;
+
+ if (ctx->state != STATE_STATUS_LINE) {
+ serf_bucket_mem_free(bucket->allocator, (void*)ctx->sl.reason);
+ }
+
+ serf_bucket_destroy(ctx->stream);
+ if (ctx->body != NULL)
+ serf_bucket_destroy(ctx->body);
+ serf_bucket_destroy(ctx->headers);
+
+ serf_default_destroy_and_data(bucket);
+}
+
+static apr_status_t fetch_line(response_context_t *ctx, int acceptable)
+{
+ return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable);
+}
+
+static apr_status_t parse_status_line(response_context_t *ctx,
+ serf_bucket_alloc_t *allocator)
+{
+ int res;
+ char *reason; /* ### stupid APR interface makes this non-const */
+
+ /* ctx->linebuf.line should be of form: HTTP/1.1 200 OK */
+ res = apr_date_checkmask(ctx->linebuf.line, "HTTP/#.# ###*");
+ if (!res) {
+ /* Not an HTTP response? Well, at least we won't understand it. */
+ return SERF_ERROR_BAD_HTTP_RESPONSE;
+ }
+
+ ctx->sl.version = SERF_HTTP_VERSION(ctx->linebuf.line[5] - '0',
+ ctx->linebuf.line[7] - '0');
+ ctx->sl.code = apr_strtoi64(ctx->linebuf.line + 8, &reason, 10);
+
+ /* Skip leading spaces for the reason string. */
+ if (apr_isspace(*reason)) {
+ reason++;
+ }
+
+ /* Copy the reason value out of the line buffer. */
+ ctx->sl.reason = serf_bstrmemdup(allocator, reason,
+ ctx->linebuf.used
+ - (reason - ctx->linebuf.line));
+
+ return APR_SUCCESS;
+}
+
+/* This code should be replaced with header buckets. */
+static apr_status_t fetch_headers(serf_bucket_t *bkt, response_context_t *ctx)
+{
+ apr_status_t status;
+
+ /* RFC 2616 says that CRLF is the only line ending, but we can easily
+ * accept any kind of line ending.
+ */
+ status = fetch_line(ctx, SERF_NEWLINE_ANY);
+ if (SERF_BUCKET_READ_ERROR(status)) {
+ return status;
+ }
+ /* Something was read. Process it. */
+
+ if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
+ const char *end_key;
+ const char *c;
+
+ end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used);
+ if (!c) {
+ /* Bad headers? */
+ return SERF_ERROR_BAD_HTTP_RESPONSE;
+ }
+
+ /* Skip over initial ':' */
+ c++;
+
+ /* And skip all whitespaces. */
+ for(; c < ctx->linebuf.line + ctx->linebuf.used; c++)
+ {
+ if (!apr_isspace(*c))
+ {
+ break;
+ }
+ }
+
+ /* Always copy the headers (from the linebuf into new mem). */
+ /* ### we should be able to optimize some mem copies */
+ serf_bucket_headers_setx(
+ ctx->headers,
+ ctx->linebuf.line, end_key - ctx->linebuf.line, 1,
+ c, ctx->linebuf.line + ctx->linebuf.used - c, 1);
+ }
+
+ return status;
+}
+
+/* Perform one iteration of the state machine.
+ *
+ * Will return when one the following conditions occurred:
+ * 1) a state change
+ * 2) an error
+ * 3) the stream is not ready or at EOF
+ * 4) APR_SUCCESS, meaning the machine can be run again immediately
+ */
+static apr_status_t run_machine(serf_bucket_t *bkt, response_context_t *ctx)
+{
+ apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */
+
+ switch (ctx->state) {
+ case STATE_STATUS_LINE:
+ /* RFC 2616 says that CRLF is the only line ending, but we can easily
+ * accept any kind of line ending.
+ */
+ status = fetch_line(ctx, SERF_NEWLINE_ANY);
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+
+ if (ctx->linebuf.state == SERF_LINEBUF_READY) {
+ /* The Status-Line is in the line buffer. Process it. */
+ status = parse_status_line(ctx, bkt->allocator);
+ if (status)
+ return status;
+
+ /* Good times ahead: we're switching protocols! */
+ if (ctx->sl.code == 101) {
+ ctx->body =
+ serf_bucket_barrier_create(ctx->stream, bkt->allocator);
+ ctx->state = STATE_DONE;
+ break;
+ }
+
+ /* Okay... move on to reading the headers. */
+ ctx->state = STATE_HEADERS;
+ }
+ else {
+ /* The connection closed before we could get the next
+ * response. Treat the request as lost so that our upper
+ * end knows the server never tried to give us a response.
+ */
+ if (APR_STATUS_IS_EOF(status)) {
+ return SERF_ERROR_REQUEST_LOST;
+ }
+ }
+ break;
+ case STATE_HEADERS:
+ status = fetch_headers(bkt, ctx);
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+
+ /* If an empty line was read, then we hit the end of the headers.
+ * Move on to the body.
+ */
+ if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
+ const void *v;
+
+ /* Advance the state. */
+ ctx->state = STATE_BODY;
+
+ ctx->body =
+ serf_bucket_barrier_create(ctx->stream, bkt->allocator);
+
+ /* Are we C-L, chunked, or conn close? */
+ v = serf_bucket_headers_get(ctx->headers, "Content-Length");
+ if (v) {
+ apr_uint64_t length;
+ length = apr_strtoi64(v, NULL, 10);
+ if (errno == ERANGE) {
+ return APR_FROM_OS_ERROR(ERANGE);
+ }
+ ctx->body = serf_bucket_limit_create(ctx->body, length,
+ bkt->allocator);
+ }
+ else {
+ v = serf_bucket_headers_get(ctx->headers, "Transfer-Encoding");
+
+ /* Need to handle multiple transfer-encoding. */
+ if (v && strcasecmp("chunked", v) == 0) {
+ ctx->chunked = 1;
+ ctx->body = serf_bucket_dechunk_create(ctx->body,
+ bkt->allocator);
+ }
+
+ if (!v && (ctx->sl.code == 204 || ctx->sl.code == 304)) {
+ ctx->state = STATE_DONE;
+ }
+ }
+ v = serf_bucket_headers_get(ctx->headers, "Content-Encoding");
+ if (v) {
+ /* Need to handle multiple content-encoding. */
+ if (v && strcasecmp("gzip", v) == 0) {
+ ctx->body =
+ serf_bucket_deflate_create(ctx->body, bkt->allocator,
+ SERF_DEFLATE_GZIP);
+ }
+ else if (v && strcasecmp("deflate", v) == 0) {
+ ctx->body =
+ serf_bucket_deflate_create(ctx->body, bkt->allocator,
+ SERF_DEFLATE_DEFLATE);
+ }
+ }
+ /* If we're a HEAD request, we don't receive a body. */
+ if (ctx->head_req) {
+ ctx->state = STATE_DONE;
+ }
+ }
+ break;
+ case STATE_BODY:
+ /* Don't do anything. */
+ break;
+ case STATE_TRAILERS:
+ status = fetch_headers(bkt, ctx);
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+
+ /* If an empty line was read, then we're done. */
+ if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
+ ctx->state = STATE_DONE;
+ return APR_EOF;
+ }
+ break;
+ case STATE_DONE:
+ return APR_EOF;
+ default:
+ /* Not reachable */
+ return APR_EGENERAL;
+ }
+
+ return status;
+}
+
+static apr_status_t wait_for_body(serf_bucket_t *bkt, response_context_t *ctx)
+{
+ apr_status_t status;
+
+ /* Keep reading and moving through states if we aren't at the BODY */
+ while (ctx->state != STATE_BODY) {
+ status = run_machine(bkt, ctx);
+
+ /* Anything other than APR_SUCCESS means that we cannot immediately
+ * read again (for now).
+ */
+ if (status)
+ return status;
+ }
+ /* in STATE_BODY */
+
+ return APR_SUCCESS;
+}
+
+apr_status_t serf_bucket_response_wait_for_headers(
+ serf_bucket_t *bucket)
+{
+ response_context_t *ctx = bucket->data;
+
+ return wait_for_body(bucket, ctx);
+}
+
+apr_status_t serf_bucket_response_status(
+ serf_bucket_t *bkt,
+ serf_status_line *sline)
+{
+ response_context_t *ctx = bkt->data;
+ apr_status_t status;
+
+ if (ctx->state != STATE_STATUS_LINE) {
+ /* We already read it and moved on. Just return it. */
+ *sline = ctx->sl;
+ return APR_SUCCESS;
+ }
+
+ /* Running the state machine once will advance the machine, or state
+ * that the stream isn't ready with enough data. There isn't ever a
+ * need to run the machine more than once to try and satisfy this. We
+ * have to look at the state to tell whether it advanced, though, as
+ * it is quite possible to advance *and* to return APR_EAGAIN.
+ */
+ status = run_machine(bkt, ctx);
+ if (ctx->state == STATE_HEADERS) {
+ *sline = ctx->sl;
+ }
+ else {
+ /* Indicate that we don't have the information yet. */
+ sline->version = 0;
+ }
+
+ return status;
+}
+
+static apr_status_t serf_response_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ response_context_t *ctx = bucket->data;
+ apr_status_t rv;
+
+ rv = wait_for_body(bucket, ctx);
+ if (rv) {
+ /* It's not possible to have read anything yet! */
+ if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) {
+ *len = 0;
+ }
+ return rv;
+ }
+
+ rv = serf_bucket_read(ctx->body, requested, data, len);
+ if (APR_STATUS_IS_EOF(rv)) {
+ if (ctx->chunked) {
+ ctx->state = STATE_TRAILERS;
+ /* Mask the result. */
+ rv = APR_SUCCESS;
+ }
+ else {
+ ctx->state = STATE_DONE;
+ }
+ }
+ return rv;
+}
+
+static apr_status_t serf_response_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data, apr_size_t *len)
+{
+ response_context_t *ctx = bucket->data;
+ apr_status_t rv;
+
+ rv = wait_for_body(bucket, ctx);
+ if (rv) {
+ return rv;
+ }
+
+ /* Delegate to the stream bucket to do the readline. */
+ return serf_bucket_readline(ctx->body, acceptable, found, data, len);
+}
+
+/* ### need to implement */
+#define serf_response_peek NULL
+
+const serf_bucket_type_t serf_bucket_type_response = {
+ "RESPONSE",
+ serf_response_read,
+ serf_response_readline,
+ serf_default_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_response_peek,
+ serf_response_destroy_and_data,
+};
diff --git a/buckets/simple_buckets.c b/buckets/simple_buckets.c
new file mode 100644
index 0000000..f36239b
--- /dev/null
+++ b/buckets/simple_buckets.c
@@ -0,0 +1,142 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_pools.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+
+typedef struct {
+ const char *original;
+ const char *current;
+ apr_size_t remaining;
+
+ serf_simple_freefunc_t freefunc;
+ void *baton;
+
+} simple_context_t;
+
+
+static void free_copied_data(void *baton, const char *data)
+{
+ serf_bucket_mem_free(baton, (char*)data);
+}
+
+serf_bucket_t *serf_bucket_simple_create(
+ const char *data,
+ apr_size_t len,
+ serf_simple_freefunc_t freefunc,
+ void *freefunc_baton,
+ serf_bucket_alloc_t *allocator)
+{
+ simple_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->original = ctx->current = data;
+ ctx->remaining = len;
+ ctx->freefunc = freefunc;
+ ctx->baton = freefunc_baton;
+
+ return serf_bucket_create(&serf_bucket_type_simple, allocator, ctx);
+}
+
+serf_bucket_t *serf_bucket_simple_copy_create(
+ const char *data, apr_size_t len,
+ serf_bucket_alloc_t *allocator)
+{
+ simple_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+
+ ctx->original = ctx->current = serf_bucket_mem_alloc(allocator, len);
+ memcpy((char*)ctx->original, data, len);
+
+ ctx->remaining = len;
+ ctx->freefunc = free_copied_data;
+ ctx->baton = allocator;
+
+ return serf_bucket_create(&serf_bucket_type_simple, allocator, ctx);
+}
+
+static apr_status_t serf_simple_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ simple_context_t *ctx = bucket->data;
+
+ if (requested == SERF_READ_ALL_AVAIL || requested > ctx->remaining)
+ requested = ctx->remaining;
+
+ *data = ctx->current;
+ *len = requested;
+
+ ctx->current += requested;
+ ctx->remaining -= requested;
+
+ return ctx->remaining ? APR_SUCCESS : APR_EOF;
+}
+
+static apr_status_t serf_simple_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data, apr_size_t *len)
+{
+ simple_context_t *ctx = bucket->data;
+
+ /* Returned data will be from current position. */
+ *data = ctx->current;
+ serf_util_readline(&ctx->current, &ctx->remaining, acceptable, found);
+
+ /* See how much ctx->current moved forward. */
+ *len = ctx->current - *data;
+
+ return ctx->remaining ? APR_SUCCESS : APR_EOF;
+}
+
+static apr_status_t serf_simple_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ simple_context_t *ctx = bucket->data;
+
+ /* return whatever we have left */
+ *data = ctx->current;
+ *len = ctx->remaining;
+
+ /* we returned everything this bucket will ever hold */
+ return APR_EOF;
+}
+
+static void serf_simple_destroy(serf_bucket_t *bucket)
+{
+ simple_context_t *ctx = bucket->data;
+
+ if (ctx->freefunc)
+ (*ctx->freefunc)(ctx->baton, ctx->original);
+
+ serf_default_destroy_and_data(bucket);
+}
+
+
+const serf_bucket_type_t serf_bucket_type_simple = {
+ "SIMPLE",
+ serf_simple_read,
+ serf_simple_readline,
+ serf_default_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_simple_peek,
+ serf_simple_destroy,
+};
diff --git a/buckets/socket_buckets.c b/buckets/socket_buckets.c
new file mode 100644
index 0000000..dd2469a
--- /dev/null
+++ b/buckets/socket_buckets.c
@@ -0,0 +1,114 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_pools.h>
+#include <apr_network_io.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+
+typedef struct {
+ apr_socket_t *skt;
+
+ serf_databuf_t databuf;
+
+ /* Progress callback */
+ serf_progress_t progress_func;
+ void *progress_baton;
+} socket_context_t;
+
+
+static apr_status_t socket_reader(void *baton, apr_size_t bufsize,
+ char *buf, apr_size_t *len)
+{
+ socket_context_t *ctx = baton;
+ apr_status_t status;
+
+ *len = bufsize;
+ status = apr_socket_recv(ctx->skt, buf, len);
+
+ if (ctx->progress_func)
+ ctx->progress_func(ctx->progress_baton, *len, 0);
+
+ return status;
+}
+
+serf_bucket_t *serf_bucket_socket_create(
+ apr_socket_t *skt,
+ serf_bucket_alloc_t *allocator)
+{
+ socket_context_t *ctx;
+
+ /* Oh, well. */
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->skt = skt;
+
+ serf_databuf_init(&ctx->databuf);
+ ctx->databuf.read = socket_reader;
+ ctx->databuf.read_baton = ctx;
+
+ ctx->progress_func = ctx->progress_baton = NULL;
+ return serf_bucket_create(&serf_bucket_type_socket, allocator, ctx);
+}
+
+void serf_bucket_socket_set_read_progress_cb(
+ serf_bucket_t *bucket,
+ const serf_progress_t progress_func,
+ void *progress_baton)
+{
+ socket_context_t *ctx = bucket->data;
+
+ ctx->progress_func = progress_func;
+ ctx->progress_baton = progress_baton;
+}
+
+static apr_status_t serf_socket_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ socket_context_t *ctx = bucket->data;
+
+ return serf_databuf_read(&ctx->databuf, requested, data, len);
+}
+
+static apr_status_t serf_socket_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data, apr_size_t *len)
+{
+ socket_context_t *ctx = bucket->data;
+
+ return serf_databuf_readline(&ctx->databuf, acceptable, found, data, len);
+}
+
+static apr_status_t serf_socket_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ socket_context_t *ctx = bucket->data;
+
+ return serf_databuf_peek(&ctx->databuf, data, len);
+}
+
+const serf_bucket_type_t serf_bucket_type_socket = {
+ "SOCKET",
+ serf_socket_read,
+ serf_socket_readline,
+ serf_default_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_socket_peek,
+ serf_default_destroy_and_data,
+};
diff --git a/buckets/ssl_buckets.c b/buckets/ssl_buckets.c
new file mode 100644
index 0000000..3f43543
--- /dev/null
+++ b/buckets/ssl_buckets.c
@@ -0,0 +1,1629 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ----
+ *
+ * For the OpenSSL thread-safety locking code:
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Originally developed by Aaron Bannert and Justin Erenkrantz, eBuilt.
+ */
+
+#include <apr_pools.h>
+#include <apr_network_io.h>
+#include <apr_portable.h>
+#include <apr_strings.h>
+#include <apr_base64.h>
+#include <apr_version.h>
+#include <apr_atomic.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+#include <openssl/bio.h>
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+#include <openssl/pkcs12.h>
+#include <openssl/x509v3.h>
+
+#ifndef APR_VERSION_AT_LEAST /* Introduced in APR 1.3.0 */
+#define APR_VERSION_AT_LEAST(major,minor,patch) \
+ (((major) < APR_MAJOR_VERSION) \
+ || ((major) == APR_MAJOR_VERSION && (minor) < APR_MINOR_VERSION) \
+ || ((major) == APR_MAJOR_VERSION && (minor) == APR_MINOR_VERSION && \
+ (patch) <= APR_PATCH_VERSION))
+#endif /* APR_VERSION_AT_LEAST */
+
+#ifndef APR_ARRAY_PUSH
+#define APR_ARRAY_PUSH(ary,type) (*((type *)apr_array_push(ary)))
+#endif
+
+
+/*#define SSL_VERBOSE*/
+
+/*
+ * Here's an overview of the SSL bucket's relationship to OpenSSL and serf.
+ *
+ * HTTP request: SSLENCRYPT(REQUEST)
+ * [context.c reads from SSLENCRYPT and writes out to the socket]
+ * HTTP response: RESPONSE(SSLDECRYPT(SOCKET))
+ * [handler function reads from RESPONSE which in turn reads from SSLDECRYPT]
+ *
+ * HTTP request read call path:
+ *
+ * write_to_connection
+ * |- serf_bucket_read on SSLENCRYPT
+ * |- serf_ssl_read
+ * |- serf_databuf_read
+ * |- common_databuf_prep
+ * |- ssl_encrypt
+ * |- 1. Try to read pending encrypted data; If available, return.
+ * |- 2. Try to read from ctx->stream [REQUEST bucket]
+ * |- 3. Call SSL_write with read data
+ * |- ...
+ * |- bio_bucket_read can be called
+ * |- bio_bucket_write with encrypted data
+ * |- store in sink
+ * |- 4. If successful, read pending encrypted data and return.
+ * |- 5. If fails, place read data back in ctx->stream
+ *
+ * HTTP response read call path:
+ *
+ * read_from_connection
+ * |- acceptor
+ * |- handler
+ * |- ...
+ * |- serf_bucket_read(SSLDECRYPT)
+ * |- serf_ssl_read
+ * |- serf_databuf_read
+ * |- ssl_decrypt
+ * |- 1. SSL_read() for pending decrypted data; if any, return.
+ * |- 2. Try to read from ctx->stream [SOCKET bucket]
+ * |- 3. Append data to ssl_ctx->source
+ * |- 4. Call SSL_read()
+ * |- ...
+ * |- bio_bucket_write can be called
+ * |- bio_bucket_read
+ * |- read data from ssl_ctx->source
+ * |- If data read, return it.
+ * |- If an error, set the STATUS value and return.
+ *
+ */
+
+typedef struct bucket_list {
+ serf_bucket_t *bucket;
+ struct bucket_list *next;
+} bucket_list_t;
+
+typedef struct {
+ /* Helper to read data. Wraps stream. */
+ serf_databuf_t databuf;
+
+ /* Our source for more data. */
+ serf_bucket_t *stream;
+
+ /* The next set of buckets */
+ bucket_list_t *stream_next;
+
+ /* The status of the last thing we read. */
+ apr_status_t status;
+ apr_status_t exhausted;
+ int exhausted_reset;
+
+ /* Data we've read but not processed. */
+ serf_bucket_t *pending;
+} serf_ssl_stream_t;
+
+struct serf_ssl_context_t {
+ /* How many open buckets refer to this context. */
+ int refcount;
+
+ /* The pool that this context uses. */
+ apr_pool_t *pool;
+
+ /* The allocator associated with the above pool. */
+ serf_bucket_alloc_t *allocator;
+
+ /* Internal OpenSSL parameters */
+ SSL_CTX *ctx;
+ SSL *ssl;
+ BIO *bio;
+
+ serf_ssl_stream_t encrypt;
+ serf_ssl_stream_t decrypt;
+
+ /* Client cert callbacks */
+ serf_ssl_need_client_cert_t cert_callback;
+ void *cert_userdata;
+ apr_pool_t *cert_cache_pool;
+ const char *cert_file_success;
+
+ /* Client cert PW callbacks */
+ serf_ssl_need_cert_password_t cert_pw_callback;
+ void *cert_pw_userdata;
+ apr_pool_t *cert_pw_cache_pool;
+ const char *cert_pw_success;
+
+ /* Server cert callbacks */
+ serf_ssl_need_server_cert_t server_cert_callback;
+ serf_ssl_server_cert_chain_cb_t server_cert_chain_callback;
+ void *server_cert_userdata;
+
+ const char *cert_path;
+
+ X509 *cached_cert;
+ EVP_PKEY *cached_cert_pw;
+
+ apr_status_t pending_err;
+};
+
+typedef struct {
+ /* The bucket-independent ssl context that this bucket is associated with */
+ serf_ssl_context_t *ssl_ctx;
+
+ /* Pointer to the 'right' databuf. */
+ serf_databuf_t *databuf;
+
+ /* Pointer to our stream, so we can find it later. */
+ serf_bucket_t **our_stream;
+} ssl_context_t;
+
+struct serf_ssl_certificate_t {
+ X509 *ssl_cert;
+ int depth;
+};
+
+/* Returns the amount read. */
+static int bio_bucket_read(BIO *bio, char *in, int inlen)
+{
+ serf_ssl_context_t *ctx = bio->ptr;
+ const char *data;
+ apr_status_t status;
+ apr_size_t len;
+
+#ifdef SSL_VERBOSE
+ printf("bio_bucket_read called for %d bytes\n", inlen);
+#endif
+
+ if (ctx->encrypt.status == SERF_ERROR_WAIT_CONN
+ && BIO_should_read(ctx->bio)) {
+#ifdef SSL_VERBOSE
+ printf("bio_bucket_read waiting: (%d %d %d)\n",
+ BIO_should_retry(ctx->bio), BIO_should_read(ctx->bio),
+ BIO_get_retry_flags(ctx->bio));
+#endif
+ /* Falling back... */
+ ctx->encrypt.exhausted_reset = 1;
+ BIO_clear_retry_flags(bio);
+ }
+
+ status = serf_bucket_read(ctx->decrypt.pending, inlen, &data, &len);
+
+ ctx->decrypt.status = status;
+#ifdef SSL_VERBOSE
+ printf("bio_bucket_read received %d bytes (%d)\n", len, status);
+#endif
+
+ if (!SERF_BUCKET_READ_ERROR(status)) {
+ /* Oh suck. */
+ if (len) {
+ memcpy(in, data, len);
+ return len;
+ }
+ if (APR_STATUS_IS_EOF(status)) {
+ BIO_set_retry_read(bio);
+ return -1;
+ }
+ }
+
+ return -1;
+}
+
+/* Returns the amount written. */
+static int bio_bucket_write(BIO *bio, const char *in, int inl)
+{
+ serf_ssl_context_t *ctx = bio->ptr;
+ serf_bucket_t *tmp;
+
+#ifdef SSL_VERBOSE
+ printf("bio_bucket_write called for %d bytes\n", inl);
+#endif
+ if (ctx->encrypt.status == SERF_ERROR_WAIT_CONN
+ && !BIO_should_read(ctx->bio)) {
+#ifdef SSL_VERBOSE
+ printf("bio_bucket_write waiting: (%d %d %d)\n",
+ BIO_should_retry(ctx->bio), BIO_should_read(ctx->bio),
+ BIO_get_retry_flags(ctx->bio));
+#endif
+ /* Falling back... */
+ ctx->encrypt.exhausted_reset = 1;
+ BIO_clear_retry_flags(bio);
+ }
+
+ tmp = serf_bucket_simple_copy_create(in, inl,
+ ctx->encrypt.pending->allocator);
+
+ serf_bucket_aggregate_append(ctx->encrypt.pending, tmp);
+
+ return inl;
+}
+
+/* Returns the amount read. */
+static int bio_file_read(BIO *bio, char *in, int inlen)
+{
+ apr_file_t *file = bio->ptr;
+ apr_status_t status;
+ apr_size_t len;
+
+ BIO_clear_retry_flags(bio);
+
+ len = inlen;
+ status = apr_file_read(file, in, &len);
+
+ if (!SERF_BUCKET_READ_ERROR(status)) {
+ /* Oh suck. */
+ if (APR_STATUS_IS_EOF(status)) {
+ BIO_set_retry_read(bio);
+ return -1;
+ } else {
+ return len;
+ }
+ }
+
+ return -1;
+}
+
+/* Returns the amount written. */
+static int bio_file_write(BIO *bio, const char *in, int inl)
+{
+ apr_file_t *file = bio->ptr;
+ apr_size_t nbytes;
+
+ BIO_clear_retry_flags(bio);
+
+ nbytes = inl;
+ apr_file_write(file, in, &nbytes);
+
+ return nbytes;
+}
+
+static int bio_file_gets(BIO *bio, char *in, int inlen)
+{
+ return bio_file_read(bio, in, inlen);
+}
+
+static int bio_bucket_create(BIO *bio)
+{
+ bio->shutdown = 1;
+ bio->init = 1;
+ bio->num = -1;
+ bio->ptr = NULL;
+
+ return 1;
+}
+
+static int bio_bucket_destroy(BIO *bio)
+{
+ /* Did we already free this? */
+ if (bio == NULL) {
+ return 0;
+ }
+
+ return 1;
+}
+
+static long bio_bucket_ctrl(BIO *bio, int cmd, long num, void *ptr)
+{
+ long ret = 1;
+
+ switch (cmd) {
+ default:
+ /* abort(); */
+ break;
+ case BIO_CTRL_FLUSH:
+ /* At this point we can't force a flush. */
+ break;
+ case BIO_CTRL_PUSH:
+ case BIO_CTRL_POP:
+ ret = 0;
+ break;
+ }
+ return ret;
+}
+
+static BIO_METHOD bio_bucket_method = {
+ BIO_TYPE_MEM,
+ "Serf SSL encryption and decryption buckets",
+ bio_bucket_write,
+ bio_bucket_read,
+ NULL, /* Is this called? */
+ NULL, /* Is this called? */
+ bio_bucket_ctrl,
+ bio_bucket_create,
+ bio_bucket_destroy,
+#ifdef OPENSSL_VERSION_NUMBER
+ NULL /* sslc does not have the callback_ctrl field */
+#endif
+};
+
+static BIO_METHOD bio_file_method = {
+ BIO_TYPE_FILE,
+ "Wrapper around APR file structures",
+ bio_file_write,
+ bio_file_read,
+ NULL, /* Is this called? */
+ bio_file_gets, /* Is this called? */
+ bio_bucket_ctrl,
+ bio_bucket_create,
+ bio_bucket_destroy,
+#ifdef OPENSSL_VERSION_NUMBER
+ NULL /* sslc does not have the callback_ctrl field */
+#endif
+};
+
+static int
+validate_server_certificate(int cert_valid, X509_STORE_CTX *store_ctx)
+{
+ SSL *ssl;
+ serf_ssl_context_t *ctx;
+ X509 *server_cert;
+ int err, depth;
+ int failures = 0;
+
+ ssl = X509_STORE_CTX_get_ex_data(store_ctx,
+ SSL_get_ex_data_X509_STORE_CTX_idx());
+ ctx = SSL_get_app_data(ssl);
+
+ server_cert = X509_STORE_CTX_get_current_cert(store_ctx);
+ depth = X509_STORE_CTX_get_error_depth(store_ctx);
+
+ /* If the certification was found invalid, get the error and convert it to
+ something our caller will understand. */
+ if (! cert_valid) {
+ err = X509_STORE_CTX_get_error(store_ctx);
+
+ switch(err) {
+ case X509_V_ERR_CERT_NOT_YET_VALID:
+ failures |= SERF_SSL_CERT_NOTYETVALID;
+ break;
+ case X509_V_ERR_CERT_HAS_EXPIRED:
+ failures |= SERF_SSL_CERT_EXPIRED;
+ break;
+ case X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT:
+ case X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN:
+ failures |= SERF_SSL_CERT_SELF_SIGNED;
+ break;
+ case X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY:
+ case X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT:
+ case X509_V_ERR_CERT_UNTRUSTED:
+ case X509_V_ERR_INVALID_CA:
+ failures |= SERF_SSL_CERT_UNKNOWNCA;
+ break;
+ default:
+ failures |= SERF_SSL_CERT_UNKNOWN_FAILURE;
+ break;
+ }
+ }
+
+ /* Check certificate expiry dates. */
+ if (X509_cmp_current_time(X509_get_notBefore(server_cert)) >= 0) {
+ failures |= SERF_SSL_CERT_NOTYETVALID;
+ }
+ else if (X509_cmp_current_time(X509_get_notAfter(server_cert)) <= 0) {
+ failures |= SERF_SSL_CERT_EXPIRED;
+ }
+
+ if (ctx->server_cert_callback &&
+ (depth == 0 || failures)) {
+ apr_status_t status;
+ serf_ssl_certificate_t *cert;
+ apr_pool_t *subpool;
+
+ apr_pool_create(&subpool, ctx->pool);
+
+ cert = apr_palloc(subpool, sizeof(serf_ssl_certificate_t));
+ cert->ssl_cert = server_cert;
+ cert->depth = depth;
+
+ /* Callback for further verification. */
+ status = ctx->server_cert_callback(ctx->server_cert_userdata,
+ failures, cert);
+ if (status == APR_SUCCESS)
+ cert_valid = 1;
+ else
+ /* Pass the error back to the caller through the context-run. */
+ ctx->pending_err = status;
+ apr_pool_destroy(subpool);
+ }
+
+ if (ctx->server_cert_chain_callback
+ && (depth == 0 || failures)) {
+ apr_status_t status;
+ STACK_OF(X509) *chain;
+ const serf_ssl_certificate_t **certs;
+ int certs_len;
+ apr_pool_t *subpool;
+
+ apr_pool_create(&subpool, ctx->pool);
+
+ /* Borrow the chain to pass to the callback. */
+ chain = X509_STORE_CTX_get_chain(store_ctx);
+
+ /* If the chain can't be retrieved, just pass the current
+ certificate. */
+ /* ### can this actually happen with _get_chain() ? */
+ if (!chain) {
+ serf_ssl_certificate_t *cert = apr_palloc(subpool, sizeof(*cert));
+
+ cert->ssl_cert = server_cert;
+ cert->depth = depth;
+
+ /* Room for the server_cert and a trailing NULL. */
+ certs = apr_palloc(subpool, sizeof(*certs) * 2);
+ certs[0] = cert;
+
+ certs_len = 1;
+ } else {
+ int i;
+
+ certs_len = sk_X509_num(chain);
+
+ /* Room for all the certs and a trailing NULL. */
+ certs = apr_palloc(subpool, sizeof(*certs) * (certs_len + 1));
+ for (i = 0; i < certs_len; ++i) {
+ serf_ssl_certificate_t *cert;
+
+ cert = apr_palloc(subpool, sizeof(*cert));
+ cert->ssl_cert = sk_X509_value(chain, i);
+ cert->depth = i;
+
+ certs[i] = cert;
+ }
+ }
+ certs[certs_len] = NULL;
+
+ /* Callback for further verification. */
+ status = ctx->server_cert_chain_callback(ctx->server_cert_userdata,
+ failures, depth,
+ certs, certs_len);
+ if (status == APR_SUCCESS) {
+ cert_valid = 1;
+ } else {
+ /* Pass the error back to the caller through the context-run. */
+ ctx->pending_err = status;
+ }
+
+ apr_pool_destroy(subpool);
+ }
+
+ return cert_valid;
+}
+
+/* This function reads an encrypted stream and returns the decrypted stream. */
+static apr_status_t ssl_decrypt(void *baton, apr_size_t bufsize,
+ char *buf, apr_size_t *len)
+{
+ serf_ssl_context_t *ctx = baton;
+ apr_size_t priv_len;
+ apr_status_t status;
+ const char *data;
+ int ssl_len;
+
+#ifdef SSL_VERBOSE
+ printf("ssl_decrypt: begin %d\n", bufsize);
+#endif
+
+ /* Is there some data waiting to be read? */
+ ssl_len = SSL_read(ctx->ssl, buf, bufsize);
+ if (ssl_len > 0) {
+#ifdef SSL_VERBOSE
+ printf("ssl_decrypt: %d bytes (%d); status: %d; flags: %d\n",
+ ssl_len, bufsize, ctx->decrypt.status,
+ BIO_get_retry_flags(ctx->bio));
+#endif
+ *len = ssl_len;
+ return APR_SUCCESS;
+ }
+
+ status = serf_bucket_read(ctx->decrypt.stream, bufsize, &data, &priv_len);
+
+ if (!SERF_BUCKET_READ_ERROR(status) && priv_len) {
+ serf_bucket_t *tmp;
+
+#ifdef SSL_VERBOSE
+ printf("ssl_decrypt: read %d bytes (%d); status: %d\n", priv_len,
+ bufsize, status);
+#endif
+
+ tmp = serf_bucket_simple_copy_create(data, priv_len,
+ ctx->decrypt.pending->allocator);
+
+ serf_bucket_aggregate_append(ctx->decrypt.pending, tmp);
+
+ ssl_len = SSL_read(ctx->ssl, buf, bufsize);
+ if (ssl_len < 0) {
+ int ssl_err;
+
+ ssl_err = SSL_get_error(ctx->ssl, ssl_len);
+ switch (ssl_err) {
+ case SSL_ERROR_SYSCALL:
+ *len = 0;
+ status = ctx->decrypt.status;
+ break;
+ case SSL_ERROR_WANT_READ:
+ *len = 0;
+ status = APR_EAGAIN;
+ break;
+ case SSL_ERROR_SSL:
+ *len = 0;
+ status = ctx->pending_err ? ctx->pending_err : APR_EGENERAL;
+ ctx->pending_err = 0;
+ break;
+ default:
+ *len = 0;
+ status = APR_EGENERAL;
+ break;
+ }
+ }
+ else {
+ *len = ssl_len;
+#ifdef SSL_VERBOSE
+ printf("---\n%s\n-(%d)-\n", buf, *len);
+#endif
+ }
+ }
+ else {
+ *len = 0;
+ }
+#ifdef SSL_VERBOSE
+ printf("ssl_decrypt: %d %d %d\n", status, *len,
+ BIO_get_retry_flags(ctx->bio));
+#endif
+ return status;
+}
+
+/* This function reads a decrypted stream and returns an encrypted stream. */
+static apr_status_t ssl_encrypt(void *baton, apr_size_t bufsize,
+ char *buf, apr_size_t *len)
+{
+ const char *data;
+ apr_size_t interim_bufsize;
+ serf_ssl_context_t *ctx = baton;
+ apr_status_t status;
+
+#ifdef SSL_VERBOSE
+ printf("ssl_encrypt: begin %d\n", bufsize);
+#endif
+
+ /* Try to read already encrypted but unread data first. */
+ status = serf_bucket_read(ctx->encrypt.pending, bufsize, &data, len);
+ if (SERF_BUCKET_READ_ERROR(status)) {
+ return status;
+ }
+
+ /* Aha, we read something. Return that now. */
+ if (*len) {
+ memcpy(buf, data, *len);
+ if (APR_STATUS_IS_EOF(status)) {
+ status = APR_SUCCESS;
+ }
+#ifdef SSL_VERBOSE
+ printf("ssl_encrypt: %d %d %d (quick read)\n", status, *len,
+ BIO_get_retry_flags(ctx->bio));
+#endif
+ return status;
+ }
+
+ if (BIO_should_retry(ctx->bio) && BIO_should_write(ctx->bio)) {
+#ifdef SSL_VERBOSE
+ printf("ssl_encrypt: %d %d %d (should write exit)\n", status, *len,
+ BIO_get_retry_flags(ctx->bio));
+#endif
+ return APR_EAGAIN;
+ }
+
+ /* If we were previously blocked, unblock ourselves now. */
+ if (BIO_should_read(ctx->bio)) {
+#ifdef SSL_VERBOSE
+ printf("ssl_encrypt: reset %d %d (%d %d %d)\n", status,
+ ctx->encrypt.status,
+ BIO_should_retry(ctx->bio), BIO_should_read(ctx->bio),
+ BIO_get_retry_flags(ctx->bio));
+#endif
+ ctx->encrypt.status = APR_SUCCESS;
+ ctx->encrypt.exhausted_reset = 0;
+ }
+
+ /* Oh well, read from our stream now. */
+ interim_bufsize = bufsize;
+ do {
+ apr_size_t interim_len;
+
+ if (!ctx->encrypt.status) {
+ struct iovec vecs[64];
+ int vecs_read;
+
+ status = serf_bucket_read_iovec(ctx->encrypt.stream,
+ interim_bufsize, 64, vecs,
+ &vecs_read);
+
+ if (!SERF_BUCKET_READ_ERROR(status) && vecs_read) {
+ char *vecs_data;
+ int i, cur, vecs_data_len;
+ int ssl_len;
+
+ /* Combine the buffers of the iovec into one buffer, as
+ that is with SSL_write requires. */
+ vecs_data_len = 0;
+ for (i = 0; i < vecs_read; i++) {
+ vecs_data_len += vecs[i].iov_len;
+ }
+
+ vecs_data = serf_bucket_mem_alloc(ctx->allocator,
+ vecs_data_len);
+
+ cur = 0;
+ for (i = 0; i < vecs_read; i++) {
+ memcpy(vecs_data + cur, vecs[i].iov_base, vecs[i].iov_len);
+ cur += vecs[i].iov_len;
+ }
+
+ interim_bufsize -= vecs_data_len;
+ interim_len = vecs_data_len;
+
+#ifdef SSL_VERBOSE
+ printf("ssl_encrypt: bucket read %d bytes; status %d\n",
+ interim_len, status);
+ printf("---\n%s\n-(%d)-\n", vecs_data, interim_len);
+#endif
+ /* Stash our status away. */
+ ctx->encrypt.status = status;
+
+ ssl_len = SSL_write(ctx->ssl, vecs_data, interim_len);
+#ifdef SSL_VERBOSE
+ printf("ssl_encrypt: SSL write: %d\n", ssl_len);
+#endif
+ /* We're done. */
+ serf_bucket_mem_free(ctx->allocator, vecs_data);
+
+ /* If we failed to write... */
+ if (ssl_len < 0) {
+ int ssl_err;
+
+ /* Ah, bugger. We need to put that data back. */
+ serf_bucket_aggregate_prepend_iovec(ctx->encrypt.stream,
+ vecs, vecs_read);
+
+ ssl_err = SSL_get_error(ctx->ssl, ssl_len);
+#ifdef SSL_VERBOSE
+ printf("ssl_encrypt: SSL write error: %d\n", ssl_err);
+#endif
+ if (ssl_err == SSL_ERROR_SYSCALL) {
+ status = ctx->encrypt.status;
+ if (SERF_BUCKET_READ_ERROR(status)) {
+ return status;
+ }
+ }
+ else {
+ /* Oh, no. */
+ if (ssl_err == SSL_ERROR_WANT_READ) {
+ status = SERF_ERROR_WAIT_CONN;
+ }
+ else {
+ status = APR_EGENERAL;
+ }
+ }
+#ifdef SSL_VERBOSE
+ printf("ssl_encrypt: SSL write error: %d %d\n", status, *len);
+#endif
+ }
+ }
+ }
+ else {
+ interim_len = 0;
+ *len = 0;
+ status = ctx->encrypt.status;
+ }
+
+ } while (!status && interim_bufsize);
+
+ /* Okay, we exhausted our underlying stream. */
+ if (!SERF_BUCKET_READ_ERROR(status)) {
+ apr_status_t agg_status;
+ struct iovec vecs[64];
+ int vecs_read, i;
+
+ /* We read something! */
+ agg_status = serf_bucket_read_iovec(ctx->encrypt.pending, bufsize,
+ 64, vecs, &vecs_read);
+ *len = 0;
+ for (i = 0; i < vecs_read; i++) {
+ memcpy(buf + *len, vecs[i].iov_base, vecs[i].iov_len);
+ *len += vecs[i].iov_len;
+ }
+
+#ifdef SSL_VERBOSE
+ printf("ssl_encrypt read agg: %d %d %d %d\n", status, agg_status,
+ ctx->encrypt.status, *len);
+#endif
+
+ if (!agg_status) {
+ status = agg_status;
+ }
+ }
+
+ if (status == SERF_ERROR_WAIT_CONN
+ && BIO_should_retry(ctx->bio) && BIO_should_read(ctx->bio)) {
+ ctx->encrypt.exhausted = ctx->encrypt.status;
+ ctx->encrypt.status = SERF_ERROR_WAIT_CONN;
+ }
+
+#ifdef SSL_VERBOSE
+ printf("ssl_encrypt finished: %d %d (%d %d %d)\n", status, *len,
+ BIO_should_retry(ctx->bio), BIO_should_read(ctx->bio),
+ BIO_get_retry_flags(ctx->bio));
+#endif
+ return status;
+}
+
+#if APR_HAS_THREADS
+static apr_pool_t *ssl_pool;
+static apr_thread_mutex_t **ssl_locks;
+
+typedef struct CRYPTO_dynlock_value {
+ apr_thread_mutex_t *lock;
+} CRYPTO_dynlock_value;
+
+static CRYPTO_dynlock_value *ssl_dyn_create(const char* file, int line)
+{
+ CRYPTO_dynlock_value *l;
+ apr_status_t rv;
+
+ l = apr_palloc(ssl_pool, sizeof(CRYPTO_dynlock_value));
+ rv = apr_thread_mutex_create(&l->lock, APR_THREAD_MUTEX_DEFAULT, ssl_pool);
+ if (rv != APR_SUCCESS) {
+ /* FIXME: return error here */
+ }
+ return l;
+}
+
+static void ssl_dyn_lock(int mode, CRYPTO_dynlock_value *l, const char *file,
+ int line)
+{
+ if (mode & CRYPTO_LOCK) {
+ apr_thread_mutex_lock(l->lock);
+ }
+ else if (mode & CRYPTO_UNLOCK) {
+ apr_thread_mutex_unlock(l->lock);
+ }
+}
+
+static void ssl_dyn_destroy(CRYPTO_dynlock_value *l, const char *file,
+ int line)
+{
+ apr_thread_mutex_destroy(l->lock);
+}
+
+static void ssl_lock(int mode, int n, const char *file, int line)
+{
+ if (mode & CRYPTO_LOCK) {
+ apr_thread_mutex_lock(ssl_locks[n]);
+ }
+ else if (mode & CRYPTO_UNLOCK) {
+ apr_thread_mutex_unlock(ssl_locks[n]);
+ }
+}
+
+static unsigned long ssl_id(void)
+{
+ /* FIXME: This is lame and not portable. -aaron */
+ return (unsigned long) apr_os_thread_current();
+}
+
+static apr_status_t cleanup_ssl(void *data)
+{
+ CRYPTO_set_locking_callback(NULL);
+ CRYPTO_set_id_callback(NULL);
+ CRYPTO_set_dynlock_create_callback(NULL);
+ CRYPTO_set_dynlock_lock_callback(NULL);
+ CRYPTO_set_dynlock_destroy_callback(NULL);
+
+ return APR_SUCCESS;
+}
+
+#endif
+
+static apr_uint32_t have_init_ssl = 0;
+
+static void init_ssl_libraries(void)
+{
+ apr_uint32_t val;
+#if APR_VERSION_AT_LEAST(1,0,0)
+ val = apr_atomic_xchg32(&have_init_ssl, 1);
+#else
+ val = apr_atomic_cas(&have_init_ssl, 1, 0);
+#endif
+
+ if (!val) {
+#if APR_HAS_THREADS
+ int i, numlocks;
+#endif
+ CRYPTO_malloc_init();
+ ERR_load_crypto_strings();
+ SSL_load_error_strings();
+ SSL_library_init();
+ OpenSSL_add_all_algorithms();
+
+#if APR_HAS_THREADS
+ numlocks = CRYPTO_num_locks();
+ apr_pool_create(&ssl_pool, NULL);
+ ssl_locks = apr_palloc(ssl_pool, sizeof(apr_thread_mutex_t*)*numlocks);
+ for (i = 0; i < numlocks; i++) {
+ apr_status_t rv;
+
+ /* Intraprocess locks don't /need/ a filename... */
+ rv = apr_thread_mutex_create(&ssl_locks[i],
+ APR_THREAD_MUTEX_DEFAULT, ssl_pool);
+ if (rv != APR_SUCCESS) {
+ /* FIXME: error out here */
+ }
+ }
+ CRYPTO_set_locking_callback(ssl_lock);
+ CRYPTO_set_id_callback(ssl_id);
+ CRYPTO_set_dynlock_create_callback(ssl_dyn_create);
+ CRYPTO_set_dynlock_lock_callback(ssl_dyn_lock);
+ CRYPTO_set_dynlock_destroy_callback(ssl_dyn_destroy);
+
+ apr_pool_cleanup_register(ssl_pool, NULL, cleanup_ssl, cleanup_ssl);
+#endif
+ }
+}
+
+static int ssl_need_client_cert(SSL *ssl, X509 **cert, EVP_PKEY **pkey)
+{
+ serf_ssl_context_t *ctx = SSL_get_app_data(ssl);
+ apr_status_t status;
+
+ if (ctx->cached_cert) {
+ *cert = ctx->cached_cert;
+ *pkey = ctx->cached_cert_pw;
+ return 1;
+ }
+
+ while (ctx->cert_callback) {
+ const char *cert_path;
+ apr_file_t *cert_file;
+ BIO *bio;
+ PKCS12 *p12;
+ int i;
+ int retrying_success = 0;
+
+ if (ctx->cert_file_success) {
+ status = APR_SUCCESS;
+ cert_path = ctx->cert_file_success;
+ ctx->cert_file_success = NULL;
+ retrying_success = 1;
+ } else {
+ status = ctx->cert_callback(ctx->cert_userdata, &cert_path);
+ }
+
+ if (status || !cert_path) {
+ break;
+ }
+
+ /* Load the x.509 cert file stored in PKCS12 */
+ status = apr_file_open(&cert_file, cert_path, APR_READ, APR_OS_DEFAULT,
+ ctx->pool);
+
+ if (status) {
+ continue;
+ }
+
+ bio = BIO_new(&bio_file_method);
+ bio->ptr = cert_file;
+
+ ctx->cert_path = cert_path;
+ p12 = d2i_PKCS12_bio(bio, NULL);
+ apr_file_close(cert_file);
+
+ i = PKCS12_parse(p12, NULL, pkey, cert, NULL);
+
+ if (i == 1) {
+ PKCS12_free(p12);
+ ctx->cached_cert = *cert;
+ ctx->cached_cert_pw = *pkey;
+ if (!retrying_success && ctx->cert_cache_pool) {
+ const char *c;
+
+ c = apr_pstrdup(ctx->cert_cache_pool, ctx->cert_path);
+
+ apr_pool_userdata_setn(c, "serf:ssl:cert",
+ apr_pool_cleanup_null,
+ ctx->cert_cache_pool);
+ }
+ return 1;
+ }
+ else {
+ int err = ERR_get_error();
+ ERR_clear_error();
+ if (ERR_GET_LIB(err) == ERR_LIB_PKCS12 &&
+ ERR_GET_REASON(err) == PKCS12_R_MAC_VERIFY_FAILURE) {
+ if (ctx->cert_pw_callback) {
+ const char *password;
+
+ if (ctx->cert_pw_success) {
+ status = APR_SUCCESS;
+ password = ctx->cert_pw_success;
+ ctx->cert_pw_success = NULL;
+ } else {
+ status = ctx->cert_pw_callback(ctx->cert_pw_userdata,
+ ctx->cert_path,
+ &password);
+ }
+
+ if (!status && password) {
+ i = PKCS12_parse(p12, password, pkey, cert, NULL);
+ if (i == 1) {
+ PKCS12_free(p12);
+ ctx->cached_cert = *cert;
+ ctx->cached_cert_pw = *pkey;
+ if (!retrying_success && ctx->cert_cache_pool) {
+ const char *c;
+
+ c = apr_pstrdup(ctx->cert_cache_pool,
+ ctx->cert_path);
+
+ apr_pool_userdata_setn(c, "serf:ssl:cert",
+ apr_pool_cleanup_null,
+ ctx->cert_cache_pool);
+ }
+ if (!retrying_success && ctx->cert_pw_cache_pool) {
+ const char *c;
+
+ c = apr_pstrdup(ctx->cert_pw_cache_pool,
+ password);
+
+ apr_pool_userdata_setn(c, "serf:ssl:certpw",
+ apr_pool_cleanup_null,
+ ctx->cert_pw_cache_pool);
+ }
+ return 1;
+ }
+ }
+ }
+ PKCS12_free(p12);
+ return 0;
+ }
+ else {
+ printf("OpenSSL cert error: %d %d %d\n", ERR_GET_LIB(err),
+ ERR_GET_FUNC(err),
+ ERR_GET_REASON(err));
+ PKCS12_free(p12);
+ }
+ }
+ }
+
+ return 0;
+}
+
+
+void serf_ssl_client_cert_provider_set(
+ serf_ssl_context_t *context,
+ serf_ssl_need_client_cert_t callback,
+ void *data,
+ void *cache_pool)
+{
+ context->cert_callback = callback;
+ context->cert_userdata = data;
+ context->cert_cache_pool = cache_pool;
+ if (context->cert_cache_pool) {
+ apr_pool_userdata_get((void**)&context->cert_file_success,
+ "serf:ssl:cert", cache_pool);
+ }
+}
+
+
+void serf_ssl_client_cert_password_set(
+ serf_ssl_context_t *context,
+ serf_ssl_need_cert_password_t callback,
+ void *data,
+ void *cache_pool)
+{
+ context->cert_pw_callback = callback;
+ context->cert_pw_userdata = data;
+ context->cert_pw_cache_pool = cache_pool;
+ if (context->cert_pw_cache_pool) {
+ apr_pool_userdata_get((void**)&context->cert_pw_success,
+ "serf:ssl:certpw", cache_pool);
+ }
+}
+
+
+void serf_ssl_server_cert_callback_set(
+ serf_ssl_context_t *context,
+ serf_ssl_need_server_cert_t callback,
+ void *data)
+{
+ context->server_cert_callback = callback;
+ context->server_cert_userdata = data;
+}
+
+void serf_ssl_server_cert_chain_callback_set(
+ serf_ssl_context_t *context,
+ serf_ssl_need_server_cert_t cert_callback,
+ serf_ssl_server_cert_chain_cb_t cert_chain_callback,
+ void *data)
+{
+ context->server_cert_callback = cert_callback;
+ context->server_cert_chain_callback = cert_chain_callback;
+ context->server_cert_userdata = data;
+}
+
+static serf_ssl_context_t *ssl_init_context(void)
+{
+ serf_ssl_context_t *ssl_ctx;
+ apr_pool_t *pool;
+ serf_bucket_alloc_t *allocator;
+
+ init_ssl_libraries();
+
+ apr_pool_create(&pool, NULL);
+ allocator = serf_bucket_allocator_create(pool, NULL, NULL);
+
+ ssl_ctx = serf_bucket_mem_alloc(allocator, sizeof(*ssl_ctx));
+
+ ssl_ctx->refcount = 0;
+ ssl_ctx->pool = pool;
+ ssl_ctx->allocator = allocator;
+
+ ssl_ctx->ctx = SSL_CTX_new(SSLv23_client_method());
+
+ SSL_CTX_set_client_cert_cb(ssl_ctx->ctx, ssl_need_client_cert);
+ ssl_ctx->cached_cert = 0;
+ ssl_ctx->cached_cert_pw = 0;
+ ssl_ctx->pending_err = APR_SUCCESS;
+
+ ssl_ctx->cert_callback = NULL;
+ ssl_ctx->cert_pw_callback = NULL;
+ ssl_ctx->server_cert_callback = NULL;
+ ssl_ctx->server_cert_chain_callback = NULL;
+
+ SSL_CTX_set_verify(ssl_ctx->ctx, SSL_VERIFY_PEER,
+ validate_server_certificate);
+ SSL_CTX_set_options(ssl_ctx->ctx, SSL_OP_ALL);
+
+ ssl_ctx->ssl = SSL_new(ssl_ctx->ctx);
+ ssl_ctx->bio = BIO_new(&bio_bucket_method);
+ ssl_ctx->bio->ptr = ssl_ctx;
+
+ SSL_set_bio(ssl_ctx->ssl, ssl_ctx->bio, ssl_ctx->bio);
+
+ SSL_set_connect_state(ssl_ctx->ssl);
+
+ SSL_set_app_data(ssl_ctx->ssl, ssl_ctx);
+
+ ssl_ctx->encrypt.stream = NULL;
+ ssl_ctx->encrypt.stream_next = NULL;
+ ssl_ctx->encrypt.pending = serf_bucket_aggregate_create(allocator);
+ ssl_ctx->encrypt.status = APR_SUCCESS;
+ serf_databuf_init(&ssl_ctx->encrypt.databuf);
+ ssl_ctx->encrypt.databuf.read = ssl_encrypt;
+ ssl_ctx->encrypt.databuf.read_baton = ssl_ctx;
+
+ ssl_ctx->decrypt.stream = NULL;
+ ssl_ctx->decrypt.pending = serf_bucket_aggregate_create(allocator);
+ ssl_ctx->decrypt.status = APR_SUCCESS;
+ serf_databuf_init(&ssl_ctx->decrypt.databuf);
+ ssl_ctx->decrypt.databuf.read = ssl_decrypt;
+ ssl_ctx->decrypt.databuf.read_baton = ssl_ctx;
+
+ return ssl_ctx;
+}
+
+static apr_status_t ssl_free_context(
+ serf_ssl_context_t *ssl_ctx)
+{
+ apr_pool_t *p;
+
+ /* If never had the pending buckets, don't try to free them. */
+ if (ssl_ctx->decrypt.pending != NULL) {
+ serf_bucket_destroy(ssl_ctx->decrypt.pending);
+ }
+ if (ssl_ctx->encrypt.pending != NULL) {
+ serf_bucket_destroy(ssl_ctx->encrypt.pending);
+ }
+
+ /* SSL_free implicitly frees the underlying BIO. */
+ SSL_free(ssl_ctx->ssl);
+ SSL_CTX_free(ssl_ctx->ctx);
+
+ p = ssl_ctx->pool;
+
+ serf_bucket_mem_free(ssl_ctx->allocator, ssl_ctx);
+ apr_pool_destroy(p);
+
+ return APR_SUCCESS;
+}
+
+static serf_bucket_t * serf_bucket_ssl_create(
+ serf_ssl_context_t *ssl_ctx,
+ serf_bucket_alloc_t *allocator,
+ const serf_bucket_type_t *type)
+{
+ ssl_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ if (!ssl_ctx) {
+ ctx->ssl_ctx = ssl_init_context();
+ }
+ else {
+ ctx->ssl_ctx = ssl_ctx;
+ }
+ ctx->ssl_ctx->refcount++;
+
+ return serf_bucket_create(type, allocator, ctx);
+}
+
+apr_status_t serf_ssl_set_hostname(serf_ssl_context_t *context,
+ const char * hostname)
+{
+#ifdef SSL_set_tlsext_host_name
+ if (SSL_set_tlsext_host_name(context->ssl, hostname) != 1) {
+ ERR_clear_error();
+ }
+#endif
+ return APR_SUCCESS;
+}
+
+apr_status_t serf_ssl_use_default_certificates(serf_ssl_context_t *ssl_ctx)
+{
+ X509_STORE *store = SSL_CTX_get_cert_store(ssl_ctx->ctx);
+
+ int result = X509_STORE_set_default_paths(store);
+
+ return result ? APR_SUCCESS : APR_EGENERAL;
+}
+
+apr_status_t serf_ssl_load_cert_file(
+ serf_ssl_certificate_t **cert,
+ const char *file_path,
+ apr_pool_t *pool)
+{
+ FILE *fp = fopen(file_path, "r");
+
+ if (fp) {
+ X509 *ssl_cert = PEM_read_X509(fp, NULL, NULL, NULL);
+ fclose(fp);
+
+ if (ssl_cert) {
+ *cert = apr_palloc(pool, sizeof(serf_ssl_certificate_t));
+ (*cert)->ssl_cert = ssl_cert;
+
+ return APR_SUCCESS;
+ }
+ }
+
+ return APR_EGENERAL;
+}
+
+
+apr_status_t serf_ssl_trust_cert(
+ serf_ssl_context_t *ssl_ctx,
+ serf_ssl_certificate_t *cert)
+{
+ X509_STORE *store = SSL_CTX_get_cert_store(ssl_ctx->ctx);
+
+ int result = X509_STORE_add_cert(store, cert->ssl_cert);
+
+ return result ? APR_SUCCESS : APR_EGENERAL;
+}
+
+
+serf_bucket_t *serf_bucket_ssl_decrypt_create(
+ serf_bucket_t *stream,
+ serf_ssl_context_t *ssl_ctx,
+ serf_bucket_alloc_t *allocator)
+{
+ serf_bucket_t *bkt;
+ ssl_context_t *ctx;
+
+ bkt = serf_bucket_ssl_create(ssl_ctx, allocator,
+ &serf_bucket_type_ssl_decrypt);
+
+ ctx = bkt->data;
+
+ ctx->databuf = &ctx->ssl_ctx->decrypt.databuf;
+ if (ctx->ssl_ctx->decrypt.stream != NULL) {
+ return NULL;
+ }
+ ctx->ssl_ctx->decrypt.stream = stream;
+ ctx->our_stream = &ctx->ssl_ctx->decrypt.stream;
+
+ return bkt;
+}
+
+
+serf_ssl_context_t *serf_bucket_ssl_decrypt_context_get(
+ serf_bucket_t *bucket)
+{
+ ssl_context_t *ctx = bucket->data;
+ return ctx->ssl_ctx;
+}
+
+
+serf_bucket_t *serf_bucket_ssl_encrypt_create(
+ serf_bucket_t *stream,
+ serf_ssl_context_t *ssl_ctx,
+ serf_bucket_alloc_t *allocator)
+{
+ serf_bucket_t *bkt;
+ ssl_context_t *ctx;
+
+ bkt = serf_bucket_ssl_create(ssl_ctx, allocator,
+ &serf_bucket_type_ssl_encrypt);
+
+ ctx = bkt->data;
+
+ ctx->databuf = &ctx->ssl_ctx->encrypt.databuf;
+ ctx->our_stream = &ctx->ssl_ctx->encrypt.stream;
+ if (ctx->ssl_ctx->encrypt.stream == NULL) {
+ serf_bucket_t *tmp = serf_bucket_aggregate_create(stream->allocator);
+ serf_bucket_aggregate_append(tmp, stream);
+ ctx->ssl_ctx->encrypt.stream = tmp;
+ }
+ else {
+ bucket_list_t *new_list;
+
+ new_list = serf_bucket_mem_alloc(ctx->ssl_ctx->allocator,
+ sizeof(*new_list));
+ new_list->bucket = stream;
+ new_list->next = NULL;
+ if (ctx->ssl_ctx->encrypt.stream_next == NULL) {
+ ctx->ssl_ctx->encrypt.stream_next = new_list;
+ }
+ else {
+ bucket_list_t *scan = ctx->ssl_ctx->encrypt.stream_next;
+
+ while (scan->next != NULL)
+ scan = scan->next;
+ scan->next = new_list;
+ }
+ }
+
+ return bkt;
+}
+
+
+serf_ssl_context_t *serf_bucket_ssl_encrypt_context_get(
+ serf_bucket_t *bucket)
+{
+ ssl_context_t *ctx = bucket->data;
+ return ctx->ssl_ctx;
+}
+
+/* Functions to read a serf_ssl_certificate structure. */
+
+/* Creates a hash_table with keys (E, CN, OU, O, L, ST and C). */
+static apr_hash_t *
+convert_X509_NAME_to_table(X509_NAME *org, apr_pool_t *pool)
+{
+ char buf[1024];
+ int ret;
+
+ apr_hash_t *tgt = apr_hash_make(pool);
+
+ ret = X509_NAME_get_text_by_NID(org,
+ NID_commonName,
+ buf, 1024);
+ if (ret != -1)
+ apr_hash_set(tgt, "CN", APR_HASH_KEY_STRING, apr_pstrdup(pool, buf));
+ ret = X509_NAME_get_text_by_NID(org,
+ NID_pkcs9_emailAddress,
+ buf, 1024);
+ if (ret != -1)
+ apr_hash_set(tgt, "E", APR_HASH_KEY_STRING, apr_pstrdup(pool, buf));
+ ret = X509_NAME_get_text_by_NID(org,
+ NID_organizationalUnitName,
+ buf, 1024);
+ if (ret != -1)
+ apr_hash_set(tgt, "OU", APR_HASH_KEY_STRING, apr_pstrdup(pool, buf));
+ ret = X509_NAME_get_text_by_NID(org,
+ NID_organizationName,
+ buf, 1024);
+ if (ret != -1)
+ apr_hash_set(tgt, "O", APR_HASH_KEY_STRING, apr_pstrdup(pool, buf));
+ ret = X509_NAME_get_text_by_NID(org,
+ NID_localityName,
+ buf, 1024);
+ if (ret != -1)
+ apr_hash_set(tgt, "L", APR_HASH_KEY_STRING, apr_pstrdup(pool, buf));
+ ret = X509_NAME_get_text_by_NID(org,
+ NID_stateOrProvinceName,
+ buf, 1024);
+ if (ret != -1)
+ apr_hash_set(tgt, "ST", APR_HASH_KEY_STRING, apr_pstrdup(pool, buf));
+ ret = X509_NAME_get_text_by_NID(org,
+ NID_countryName,
+ buf, 1024);
+ if (ret != -1)
+ apr_hash_set(tgt, "C", APR_HASH_KEY_STRING, apr_pstrdup(pool, buf));
+
+ return tgt;
+}
+
+
+int serf_ssl_cert_depth(const serf_ssl_certificate_t *cert)
+{
+ return cert->depth;
+}
+
+
+apr_hash_t *serf_ssl_cert_issuer(
+ const serf_ssl_certificate_t *cert,
+ apr_pool_t *pool)
+{
+ X509_NAME *issuer = X509_get_issuer_name(cert->ssl_cert);
+
+ if (!issuer)
+ return NULL;
+
+ return convert_X509_NAME_to_table(issuer, pool);
+}
+
+
+apr_hash_t *serf_ssl_cert_subject(
+ const serf_ssl_certificate_t *cert,
+ apr_pool_t *pool)
+{
+ X509_NAME *subject = X509_get_subject_name(cert->ssl_cert);
+
+ if (!subject)
+ return NULL;
+
+ return convert_X509_NAME_to_table(subject, pool);
+}
+
+
+apr_hash_t *serf_ssl_cert_certificate(
+ const serf_ssl_certificate_t *cert,
+ apr_pool_t *pool)
+{
+ apr_hash_t *tgt = apr_hash_make(pool);
+ unsigned int md_size, i;
+ unsigned char md[EVP_MAX_MD_SIZE];
+ BIO *bio;
+ STACK_OF(GENERAL_NAME) *names;
+
+ /* sha1 fingerprint */
+ if (X509_digest(cert->ssl_cert, EVP_sha1(), md, &md_size)) {
+ const char hex[] = "0123456789ABCDEF";
+ char fingerprint[EVP_MAX_MD_SIZE * 3];
+
+ for (i=0; i<md_size; i++) {
+ fingerprint[3*i] = hex[(md[i] & 0xf0) >> 4];
+ fingerprint[(3*i)+1] = hex[(md[i] & 0x0f)];
+ fingerprint[(3*i)+2] = ':';
+ }
+ if (md_size > 0)
+ fingerprint[(3*(md_size-1))+2] = '\0';
+ else
+ fingerprint[0] = '\0';
+
+ apr_hash_set(tgt, "sha1", APR_HASH_KEY_STRING,
+ apr_pstrdup(pool, fingerprint));
+ }
+
+ /* set expiry dates */
+ bio = BIO_new(BIO_s_mem());
+ if (bio) {
+ ASN1_TIME *notBefore, *notAfter;
+ char buf[256];
+
+ memset (buf, 0, sizeof (buf));
+ notBefore = X509_get_notBefore(cert->ssl_cert);
+ if (ASN1_TIME_print(bio, notBefore)) {
+ BIO_read(bio, buf, 255);
+ apr_hash_set(tgt, "notBefore", APR_HASH_KEY_STRING,
+ apr_pstrdup(pool, buf));
+ }
+ memset (buf, 0, sizeof (buf));
+ notAfter = X509_get_notAfter(cert->ssl_cert);
+ if (ASN1_TIME_print(bio, notAfter)) {
+ BIO_read(bio, buf, 255);
+ apr_hash_set(tgt, "notAfter", APR_HASH_KEY_STRING,
+ apr_pstrdup(pool, buf));
+ }
+ }
+ BIO_free(bio);
+
+ /* Get subjectAltNames */
+ names = X509_get_ext_d2i(cert->ssl_cert, NID_subject_alt_name, NULL, NULL);
+ if (names) {
+ int names_count = sk_GENERAL_NAME_num(names);
+
+ apr_array_header_t *san_arr = apr_array_make(pool, names_count,
+ sizeof(char*));
+ apr_hash_set(tgt, "subjectAltName", APR_HASH_KEY_STRING, san_arr);
+ for (i = 0; i < names_count; i++) {
+ char *p = NULL;
+ GENERAL_NAME *nm = sk_GENERAL_NAME_value(names, i);
+
+ switch (nm->type) {
+ case GEN_DNS:
+ p = apr_pstrmemdup(pool, (const char *)nm->d.ia5->data,
+ nm->d.ia5->length);
+ break;
+ default:
+ /* Don't know what to do - skip. */
+ break;
+ }
+ if (p) {
+ APR_ARRAY_PUSH(san_arr, char*) = p;
+ }
+ }
+ sk_GENERAL_NAME_pop_free(names, GENERAL_NAME_free);
+ }
+
+ return tgt;
+}
+
+
+const char *serf_ssl_cert_export(
+ const serf_ssl_certificate_t *cert,
+ apr_pool_t *pool)
+{
+ char *binary_cert;
+ char *encoded_cert;
+ int len;
+ unsigned char *unused;
+
+ /* find the length of the DER encoding. */
+ len = i2d_X509(cert->ssl_cert, NULL);
+ if (len < 0) {
+ return NULL;
+ }
+
+ binary_cert = apr_palloc(pool, len);
+ unused = (unsigned char *)binary_cert;
+ len = i2d_X509(cert->ssl_cert, &unused); /* unused is incremented */
+ if (len < 0) {
+ return NULL;
+ }
+
+ encoded_cert = apr_palloc(pool, apr_base64_encode_len(len));
+ apr_base64_encode(encoded_cert, binary_cert, len);
+
+ return encoded_cert;
+}
+
+static void serf_ssl_destroy_and_data(serf_bucket_t *bucket)
+{
+ ssl_context_t *ctx = bucket->data;
+
+ if (!--ctx->ssl_ctx->refcount) {
+ ssl_free_context(ctx->ssl_ctx);
+ }
+
+ serf_default_destroy_and_data(bucket);
+}
+
+static void serf_ssl_decrypt_destroy_and_data(serf_bucket_t *bucket)
+{
+ ssl_context_t *ctx = bucket->data;
+
+ serf_bucket_destroy(*ctx->our_stream);
+
+ serf_ssl_destroy_and_data(bucket);
+}
+
+static void serf_ssl_encrypt_destroy_and_data(serf_bucket_t *bucket)
+{
+ ssl_context_t *ctx = bucket->data;
+ serf_ssl_context_t *ssl_ctx = ctx->ssl_ctx;
+
+ if (ssl_ctx->encrypt.stream == *ctx->our_stream) {
+ serf_bucket_destroy(*ctx->our_stream);
+ serf_bucket_destroy(ssl_ctx->encrypt.pending);
+
+ /* Reset our encrypted status and databuf. */
+ ssl_ctx->encrypt.status = APR_SUCCESS;
+ ssl_ctx->encrypt.databuf.status = APR_SUCCESS;
+
+ /* Advance to the next stream - if we have one. */
+ if (ssl_ctx->encrypt.stream_next == NULL) {
+ ssl_ctx->encrypt.stream = NULL;
+ ssl_ctx->encrypt.pending = NULL;
+ }
+ else {
+ bucket_list_t *cur;
+
+ cur = ssl_ctx->encrypt.stream_next;
+ ssl_ctx->encrypt.stream = cur->bucket;
+ ssl_ctx->encrypt.pending =
+ serf_bucket_aggregate_create(cur->bucket->allocator);
+ ssl_ctx->encrypt.stream_next = cur->next;
+ serf_bucket_mem_free(ssl_ctx->allocator, cur);
+ }
+ }
+ else {
+ /* Ah, darn. We haven't sent this one along yet. */
+ return;
+ }
+ serf_ssl_destroy_and_data(bucket);
+}
+
+static apr_status_t serf_ssl_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data, apr_size_t *len)
+{
+ ssl_context_t *ctx = bucket->data;
+
+ return serf_databuf_read(ctx->databuf, requested, data, len);
+}
+
+static apr_status_t serf_ssl_readline(serf_bucket_t *bucket,
+ int acceptable, int *found,
+ const char **data,
+ apr_size_t *len)
+{
+ ssl_context_t *ctx = bucket->data;
+
+ return serf_databuf_readline(ctx->databuf, acceptable, found, data, len);
+}
+
+static apr_status_t serf_ssl_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ ssl_context_t *ctx = bucket->data;
+
+ return serf_databuf_peek(ctx->databuf, data, len);
+}
+
+
+const serf_bucket_type_t serf_bucket_type_ssl_encrypt = {
+ "SSLENCRYPT",
+ serf_ssl_read,
+ serf_ssl_readline,
+ serf_default_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_ssl_peek,
+ serf_ssl_encrypt_destroy_and_data,
+};
+
+const serf_bucket_type_t serf_bucket_type_ssl_decrypt = {
+ "SSLDECRYPT",
+ serf_ssl_read,
+ serf_ssl_readline,
+ serf_default_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_default_read_bucket,
+ serf_ssl_peek,
+ serf_ssl_decrypt_destroy_and_data,
+};