From 74dd0183e4e56b07cedfa87eae7a8fb3166f01e8 Mon Sep 17 00:00:00 2001 From: Lorry Date: Tue, 28 Aug 2012 15:30:14 +0100 Subject: Tarball conversion --- buckets/aggregate_buckets.c | 400 +++++++++++ buckets/allocator.c | 434 ++++++++++++ buckets/barrier_buckets.c | 97 +++ buckets/buckets.c | 538 ++++++++++++++ buckets/bwtp_buckets.c | 596 ++++++++++++++++ buckets/chunk_buckets.c | 235 +++++++ buckets/dechunk_buckets.c | 185 +++++ buckets/deflate_buckets.c | 384 ++++++++++ buckets/file_buckets.c | 117 ++++ buckets/headers_buckets.c | 429 ++++++++++++ buckets/iovec_buckets.c | 169 +++++ buckets/limit_buckets.c | 134 ++++ buckets/mmap_buckets.c | 118 ++++ buckets/request_buckets.c | 228 ++++++ buckets/response_buckets.c | 429 ++++++++++++ buckets/simple_buckets.c | 142 ++++ buckets/socket_buckets.c | 114 +++ buckets/ssl_buckets.c | 1629 +++++++++++++++++++++++++++++++++++++++++++ 18 files changed, 6378 insertions(+) create mode 100644 buckets/aggregate_buckets.c create mode 100644 buckets/allocator.c create mode 100644 buckets/barrier_buckets.c create mode 100644 buckets/buckets.c create mode 100644 buckets/bwtp_buckets.c create mode 100644 buckets/chunk_buckets.c create mode 100644 buckets/dechunk_buckets.c create mode 100644 buckets/deflate_buckets.c create mode 100644 buckets/file_buckets.c create mode 100644 buckets/headers_buckets.c create mode 100644 buckets/iovec_buckets.c create mode 100644 buckets/limit_buckets.c create mode 100644 buckets/mmap_buckets.c create mode 100644 buckets/request_buckets.c create mode 100644 buckets/response_buckets.c create mode 100644 buckets/simple_buckets.c create mode 100644 buckets/socket_buckets.c create mode 100644 buckets/ssl_buckets.c (limited to 'buckets') diff --git a/buckets/aggregate_buckets.c b/buckets/aggregate_buckets.c new file mode 100644 index 0000000..d9d15a3 --- /dev/null +++ b/buckets/aggregate_buckets.c @@ -0,0 +1,400 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "serf.h" +#include "serf_bucket_util.h" + + +/* Should be an APR_RING? */ +typedef struct bucket_list { + serf_bucket_t *bucket; + struct bucket_list *next; +} bucket_list_t; + +typedef struct { + bucket_list_t *list; /* active buckets */ + bucket_list_t *last; /* last bucket of the list */ + bucket_list_t *done; /* we finished reading this; now pending a destroy */ + + serf_bucket_aggregate_eof_t hold_open; + void *hold_open_baton; + + /* Does this bucket own its children? !0 if yes, 0 if not. */ + int bucket_owner; +} aggregate_context_t; + + +static void cleanup_aggregate(aggregate_context_t *ctx, + serf_bucket_alloc_t *allocator) +{ + bucket_list_t *next_list; + + /* If we finished reading a bucket during the previous read, then + * we can now toss that bucket. + */ + while (ctx->done != NULL) { + next_list = ctx->done->next; + + if (ctx->bucket_owner) { + serf_bucket_destroy(ctx->done->bucket); + } + serf_bucket_mem_free(allocator, ctx->done); + + ctx->done = next_list; + } +} + +void serf_bucket_aggregate_cleanup( + serf_bucket_t *bucket, serf_bucket_alloc_t *allocator) +{ + aggregate_context_t *ctx = bucket->data; + + cleanup_aggregate(ctx, allocator); +} + +static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator) +{ + aggregate_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + + ctx->list = NULL; + ctx->last = NULL; + ctx->done = NULL; + ctx->hold_open = NULL; + ctx->hold_open_baton = NULL; + ctx->bucket_owner = 1; + + return ctx; +} + +serf_bucket_t *serf_bucket_aggregate_create( + serf_bucket_alloc_t *allocator) +{ + aggregate_context_t *ctx; + + ctx = create_aggregate(allocator); + + return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx); +} + +serf_bucket_t *serf__bucket_stream_create( + serf_bucket_alloc_t *allocator, + serf_bucket_aggregate_eof_t fn, + void *baton) +{ + serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator); + aggregate_context_t *ctx = bucket->data; + + serf_bucket_aggregate_hold_open(bucket, fn, baton); + + ctx->bucket_owner = 0; + + return bucket; +} + + +static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket) +{ + aggregate_context_t *ctx = bucket->data; + bucket_list_t *next_ctx; + + while (ctx->list) { + if (ctx->bucket_owner) { + serf_bucket_destroy(ctx->list->bucket); + } + next_ctx = ctx->list->next; + serf_bucket_mem_free(bucket->allocator, ctx->list); + ctx->list = next_ctx; + } + cleanup_aggregate(ctx, bucket->allocator); + + serf_default_destroy_and_data(bucket); +} + +void serf_bucket_aggregate_become(serf_bucket_t *bucket) +{ + aggregate_context_t *ctx; + + ctx = create_aggregate(bucket->allocator); + + bucket->type = &serf_bucket_type_aggregate; + bucket->data = ctx; + + /* The allocator remains the same. */ +} + + +void serf_bucket_aggregate_prepend( + serf_bucket_t *aggregate_bucket, + serf_bucket_t *prepend_bucket) +{ + aggregate_context_t *ctx = aggregate_bucket->data; + bucket_list_t *new_list; + + new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator, + sizeof(*new_list)); + new_list->bucket = prepend_bucket; + new_list->next = ctx->list; + + ctx->list = new_list; +} + +void serf_bucket_aggregate_append( + serf_bucket_t *aggregate_bucket, + serf_bucket_t *append_bucket) +{ + aggregate_context_t *ctx = aggregate_bucket->data; + bucket_list_t *new_list; + + new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator, + sizeof(*new_list)); + new_list->bucket = append_bucket; + new_list->next = NULL; + + /* If we use APR_RING, this is trivial. So, wait. + new_list->next = ctx->list; + ctx->list = new_list; + */ + if (ctx->list == NULL) { + ctx->list = new_list; + ctx->last = new_list; + } + else { + ctx->last->next = new_list; + ctx->last = ctx->last->next; + } +} + +void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket, + serf_bucket_aggregate_eof_t fn, + void *baton) +{ + aggregate_context_t *ctx = aggregate_bucket->data; + ctx->hold_open = fn; + ctx->hold_open_baton = baton; +} + +void serf_bucket_aggregate_prepend_iovec( + serf_bucket_t *aggregate_bucket, + struct iovec *vecs, + int vecs_count) +{ + int i; + + /* Add in reverse order. */ + for (i = vecs_count - 1; i >= 0; i--) { + serf_bucket_t *new_bucket; + + new_bucket = serf_bucket_simple_create(vecs[i].iov_base, + vecs[i].iov_len, + NULL, NULL, + aggregate_bucket->allocator); + + serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket); + + } +} + +void serf_bucket_aggregate_append_iovec( + serf_bucket_t *aggregate_bucket, + struct iovec *vecs, + int vecs_count) +{ + serf_bucket_t *new_bucket; + + new_bucket = serf_bucket_iovec_create(vecs, vecs_count, + aggregate_bucket->allocator); + + serf_bucket_aggregate_append(aggregate_bucket, new_bucket); +} + +static apr_status_t read_aggregate(serf_bucket_t *bucket, + apr_size_t requested, + int vecs_size, struct iovec *vecs, + int *vecs_used) +{ + aggregate_context_t *ctx = bucket->data; + int cur_vecs_used; + apr_status_t status; + + *vecs_used = 0; + + if (!ctx->list) { + if (ctx->hold_open) { + return ctx->hold_open(ctx->hold_open_baton, bucket); + } + else { + return APR_EOF; + } + } + + status = APR_SUCCESS; + while (requested) { + serf_bucket_t *head = ctx->list->bucket; + + status = serf_bucket_read_iovec(head, requested, vecs_size, vecs, + &cur_vecs_used); + + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + /* Add the number of vecs we read to our running total. */ + *vecs_used += cur_vecs_used; + + if (cur_vecs_used > 0 || status) { + bucket_list_t *next_list; + + /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now + * as it isn't safe to read more without returning to our caller. + */ + if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) { + return status; + } + + /* However, if we read EOF, we can stash this bucket in a + * to-be-freed list and move on to the next bucket. This ensures + * that the bucket stays alive (so as not to violate our read + * semantics). We'll destroy this list of buckets the next time + * we are asked to perform a read operation - thus ensuring the + * proper read lifetime. + */ + next_list = ctx->list->next; + ctx->list->next = ctx->done; + ctx->done = ctx->list; + ctx->list = next_list; + + /* If we have no more in our list, return EOF. */ + if (!ctx->list) { + if (ctx->hold_open) { + return ctx->hold_open(ctx->hold_open_baton, bucket); + } + else { + return APR_EOF; + } + } + + /* At this point, it safe to read the next bucket - if we can. */ + + /* If the caller doesn't want ALL_AVAIL, decrement the size + * of the items we just read from the list. + */ + if (requested != SERF_READ_ALL_AVAIL) { + int i; + + for (i = 0; i < cur_vecs_used; i++) + requested -= vecs[i].iov_len; + } + + /* Adjust our vecs to account for what we just read. */ + vecs_size -= cur_vecs_used; + vecs += cur_vecs_used; + + /* We reached our max. Oh well. */ + if (!requested || !vecs_size) { + return APR_SUCCESS; + } + } + } + + return status; +} + +static apr_status_t serf_aggregate_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + aggregate_context_t *ctx = bucket->data; + struct iovec vec; + int vecs_used; + apr_status_t status; + + cleanup_aggregate(ctx, bucket->allocator); + + status = read_aggregate(bucket, requested, 1, &vec, &vecs_used); + + if (!vecs_used) { + *len = 0; + } + else { + *data = vec.iov_base; + *len = vec.iov_len; + } + + return status; +} + +static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket, + apr_size_t requested, + int vecs_size, + struct iovec *vecs, + int *vecs_used) +{ + aggregate_context_t *ctx = bucket->data; + + cleanup_aggregate(ctx, bucket->allocator); + + return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used); +} + +static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + /* Follow pattern from serf_aggregate_read. */ + return APR_ENOTIMPL; +} + +static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + /* Follow pattern from serf_aggregate_read. */ + return APR_ENOTIMPL; +} + +static serf_bucket_t * serf_aggregate_read_bucket( + serf_bucket_t *bucket, + const serf_bucket_type_t *type) +{ + aggregate_context_t *ctx = bucket->data; + serf_bucket_t *found_bucket; + + if (!ctx->list) { + return NULL; + } + + if (ctx->list->bucket->type == type) { + /* Got the bucket. Consume it from our list. */ + found_bucket = ctx->list->bucket; + ctx->list = ctx->list->next; + return found_bucket; + } + + /* Call read_bucket on first one in our list. */ + return serf_bucket_read_bucket(ctx->list->bucket, type); +} + + +const serf_bucket_type_t serf_bucket_type_aggregate = { + "AGGREGATE", + serf_aggregate_read, + serf_aggregate_readline, + serf_aggregate_read_iovec, + serf_default_read_for_sendfile, + serf_aggregate_read_bucket, + serf_aggregate_peek, + serf_aggregate_destroy_and_data, +}; diff --git a/buckets/allocator.c b/buckets/allocator.c new file mode 100644 index 0000000..857cc74 --- /dev/null +++ b/buckets/allocator.c @@ -0,0 +1,434 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +#include "serf.h" +#include "serf_bucket_util.h" + + +typedef struct node_header_t { + apr_size_t size; + union { + struct node_header_t *next; /* if size == 0 (freed/inactive) */ + /* no data if size == STANDARD_NODE_SIZE */ + apr_memnode_t *memnode; /* if size > STANDARD_NODE_SIZE */ + } u; +} node_header_t; + +/* The size of a node_header_t, properly aligned. Note that (normally) + * this macro will round the size to a multiple of 8 bytes. Keep this in + * mind when altering the node_header_t structure. Also, keep in mind that + * node_header_t is an overhead for every allocation performed through + * the serf_bucket_mem_alloc() function. + */ +#define SIZEOF_NODE_HEADER_T APR_ALIGN_DEFAULT(sizeof(node_header_t)) + + +/* STANDARD_NODE_SIZE is manually set to an allocation size that will + * capture most allocators performed via this API. It must be "large + * enough" to avoid lots of spillage to allocating directly from the + * apr_allocator associated with the bucket allocator. The apr_allocator + * has a minimum size of 8k, which can be expensive if you missed the + * STANDARD_NODE_SIZE by just a few bytes. + */ +/* ### we should define some rules or ways to determine how to derive + * ### a "good" value for this. probably log some stats on allocs, then + * ### analyze them for size "misses". then find the balance point between + * ### wasted space due to min-size allocator, and wasted-space due to + * ### size-spill to the 8k minimum. + */ +#define STANDARD_NODE_SIZE 128 + +/* When allocating a block of memory from the allocator, we should go for + * an 8k block, minus the overhead that the allocator needs. + */ +#define ALLOC_AMT (8192 - APR_MEMNODE_T_SIZE) + +/* Define DEBUG_DOUBLE_FREE if you're interested in debugging double-free + * calls to serf_bucket_mem_free(). + */ +#define DEBUG_DOUBLE_FREE + + +typedef struct { + const serf_bucket_t *bucket; + apr_status_t last; +} read_status_t; + +#define TRACK_BUCKET_COUNT 100 /* track N buckets' status */ + +typedef struct { + int next_index; /* info[] is a ring. next bucket goes at this idx. */ + int num_used; + + read_status_t info[TRACK_BUCKET_COUNT]; +} track_state_t; + + +struct serf_bucket_alloc_t { + apr_pool_t *pool; + apr_allocator_t *allocator; + int own_allocator; + + serf_unfreed_func_t unfreed; + void *unfreed_baton; + + apr_uint32_t num_alloc; + + node_header_t *freelist; /* free STANDARD_NODE_SIZE blocks */ + apr_memnode_t *blocks; /* blocks we allocated for subdividing */ + + track_state_t *track; +}; + +/* ==================================================================== */ + + +static apr_status_t allocator_cleanup(void *data) +{ + serf_bucket_alloc_t *allocator = data; + + /* If we allocated anything, give it back. */ + if (allocator->blocks) { + apr_allocator_free(allocator->allocator, allocator->blocks); + } + + /* If we allocated our own allocator (?!), destroy it here. */ + if (allocator->own_allocator) { + apr_allocator_destroy(allocator->allocator); + } + + return APR_SUCCESS; +} + +serf_bucket_alloc_t *serf_bucket_allocator_create( + apr_pool_t *pool, + serf_unfreed_func_t unfreed, + void *unfreed_baton) +{ + serf_bucket_alloc_t *allocator = apr_pcalloc(pool, sizeof(*allocator)); + + allocator->pool = pool; + allocator->allocator = apr_pool_allocator_get(pool); + if (allocator->allocator == NULL) { + /* This most likely means pools are running in debug mode, create our + * own allocator to deal with memory ourselves */ + apr_allocator_create(&allocator->allocator); + allocator->own_allocator = 1; + } + allocator->unfreed = unfreed; + allocator->unfreed_baton = unfreed_baton; + +#ifdef SERF_DEBUG_BUCKET_USE + { + track_state_t *track; + + track = allocator->track = apr_palloc(pool, sizeof(*allocator->track)); + track->next_index = 0; + track->num_used = 0; + } +#endif + + /* ### this implies buckets cannot cross a fork/exec. desirable? + * + * ### hmm. it probably also means that buckets cannot be AROUND + * ### during a fork/exec. the new process will try to clean them + * ### up and figure out there are unfreed blocks... + */ + apr_pool_cleanup_register(pool, allocator, + allocator_cleanup, allocator_cleanup); + + return allocator; +} + +apr_pool_t *serf_bucket_allocator_get_pool( + const serf_bucket_alloc_t *allocator) +{ + return allocator->pool; +} + + +void *serf_bucket_mem_alloc( + serf_bucket_alloc_t *allocator, + apr_size_t size) +{ + node_header_t *node; + + ++allocator->num_alloc; + + size += SIZEOF_NODE_HEADER_T; + if (size <= STANDARD_NODE_SIZE) { + if (allocator->freelist) { + /* just pull a node off our freelist */ + node = allocator->freelist; + allocator->freelist = node->u.next; +#ifdef DEBUG_DOUBLE_FREE + /* When we free an item, we set its size to zero. Thus, when + * we return it to the caller, we must ensure the size is set + * properly. + */ + node->size = STANDARD_NODE_SIZE; +#endif + } + else { + apr_memnode_t *active = allocator->blocks; + + if (active == NULL + || active->first_avail + STANDARD_NODE_SIZE >= active->endp) { + apr_memnode_t *head = allocator->blocks; + + /* ran out of room. grab another block. */ + active = apr_allocator_alloc(allocator->allocator, ALLOC_AMT); + + /* System couldn't provide us with memory. */ + if (active == NULL) + return NULL; + + /* link the block into our tracking list */ + allocator->blocks = active; + active->next = head; + } + + node = (node_header_t *)active->first_avail; + node->size = STANDARD_NODE_SIZE; + active->first_avail += STANDARD_NODE_SIZE; + } + } + else { + apr_memnode_t *memnode = apr_allocator_alloc(allocator->allocator, + size); + + if (memnode == NULL) + return NULL; + + node = (node_header_t *)memnode->first_avail; + node->u.memnode = memnode; + node->size = size; + } + + return ((char *)node) + SIZEOF_NODE_HEADER_T; +} + + +void *serf_bucket_mem_calloc( + serf_bucket_alloc_t *allocator, + apr_size_t size) +{ + void *mem; + mem = serf_bucket_mem_alloc(allocator, size); + if (mem == NULL) + return NULL; + memset(mem, 0, size); + return mem; +} + + +void serf_bucket_mem_free( + serf_bucket_alloc_t *allocator, + void *block) +{ + node_header_t *node; + + --allocator->num_alloc; + + node = (node_header_t *)((char *)block - SIZEOF_NODE_HEADER_T); + + if (node->size == STANDARD_NODE_SIZE) { + /* put the node onto our free list */ + node->u.next = allocator->freelist; + allocator->freelist = node; + +#ifdef DEBUG_DOUBLE_FREE + /* note that this thing was freed. */ + node->size = 0; + } + else if (node->size == 0) { + /* damn thing was freed already. */ + abort(); +#endif + } + else { +#ifdef DEBUG_DOUBLE_FREE + /* note that this thing was freed. */ + node->size = 0; +#endif + + /* now free it */ + apr_allocator_free(allocator->allocator, node->u.memnode); + } +} + + +/* ==================================================================== */ + + +#ifdef SERF_DEBUG_BUCKET_USE + +static read_status_t *find_read_status( + track_state_t *track, + const serf_bucket_t *bucket, + int create_rs) +{ + read_status_t *rs; + + if (track->num_used) { + int count = track->num_used; + int idx = track->next_index; + + /* Search backwards. In all likelihood, the bucket which just got + * read was read very recently. + */ + while (count-- > 0) { + if (!idx--) { + /* assert: track->num_used == TRACK_BUCKET_COUNT */ + idx = track->num_used - 1; + } + if ((rs = &track->info[idx])->bucket == bucket) { + return rs; + } + } + } + + /* Only create a new read_status_t when asked. */ + if (!create_rs) + return NULL; + + if (track->num_used < TRACK_BUCKET_COUNT) { + /* We're still filling up the ring. */ + ++track->num_used; + } + + rs = &track->info[track->next_index]; + rs->bucket = bucket; + rs->last = APR_SUCCESS; /* ### the right initial value? */ + + if (++track->next_index == TRACK_BUCKET_COUNT) + track->next_index = 0; + + return rs; +} + +#endif /* SERF_DEBUG_BUCKET_USE */ + + +apr_status_t serf_debug__record_read( + const serf_bucket_t *bucket, + apr_status_t status) +{ +#ifndef SERF_DEBUG_BUCKET_USE + return status; +#else + + track_state_t *track = bucket->allocator->track; + read_status_t *rs = find_read_status(track, bucket, 1); + + /* Validate that the previous status value allowed for another read. */ + if (APR_STATUS_IS_EAGAIN(rs->last) /* ### or APR_EOF? */) { + /* Somebody read when they weren't supposed to. Bail. */ + abort(); + } + + /* Save the current status for later. */ + rs->last = status; + + return status; +#endif +} + + +void serf_debug__entered_loop(serf_bucket_alloc_t *allocator) +{ +#ifdef SERF_DEBUG_BUCKET_USE + + track_state_t *track = allocator->track; + read_status_t *rs = &track->info[0]; + + for ( ; track->num_used; --track->num_used, ++rs ) { + if (rs->last == APR_SUCCESS) { + /* Somebody should have read this bucket again. */ + abort(); + } + + /* ### other status values? */ + } + + /* num_used was reset. also need to reset the next index. */ + track->next_index = 0; + +#endif +} + + +void serf_debug__closed_conn(serf_bucket_alloc_t *allocator) +{ +#ifdef SERF_DEBUG_BUCKET_USE + + /* Just reset the number used so that we don't examine the info[] */ + allocator->track->num_used = 0; + allocator->track->next_index = 0; + +#endif +} + + +void serf_debug__bucket_destroy(const serf_bucket_t *bucket) +{ +#ifdef SERF_DEBUG_BUCKET_USE + + track_state_t *track = bucket->allocator->track; + read_status_t *rs = find_read_status(track, bucket, 0); + + if (rs != NULL && rs->last != APR_EOF) { + /* The bucket was destroyed before it was read to completion. */ + + /* Special exception for socket buckets. If a connection remains + * open, they are not read to completion. + */ + if (SERF_BUCKET_IS_SOCKET(bucket)) + return; + + /* Ditto for SSL Decrypt buckets. */ + if (SERF_BUCKET_IS_SSL_DECRYPT(bucket)) + return; + + /* Ditto for SSL Encrypt buckets. */ + if (SERF_BUCKET_IS_SSL_ENCRYPT(bucket)) + return; + + /* Ditto for barrier buckets. */ + if (SERF_BUCKET_IS_BARRIER(bucket)) + return; + + + abort(); + } + +#endif +} + + +void serf_debug__bucket_alloc_check( + serf_bucket_alloc_t *allocator) +{ +#ifdef SERF_DEBUG_BUCKET_USE + if (allocator->num_alloc != 0) { + abort(); + } +#endif +} + diff --git a/buckets/barrier_buckets.c b/buckets/barrier_buckets.c new file mode 100644 index 0000000..eb410ee --- /dev/null +++ b/buckets/barrier_buckets.c @@ -0,0 +1,97 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "serf.h" +#include "serf_bucket_util.h" + + +typedef struct { + serf_bucket_t *stream; +} barrier_context_t; + + +serf_bucket_t *serf_bucket_barrier_create( + serf_bucket_t *stream, + serf_bucket_alloc_t *allocator) +{ + barrier_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->stream = stream; + + return serf_bucket_create(&serf_bucket_type_barrier, allocator, ctx); +} + +static apr_status_t serf_barrier_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + barrier_context_t *ctx = bucket->data; + + return serf_bucket_read(ctx->stream, requested, data, len); +} + +static apr_status_t serf_barrier_read_iovec(serf_bucket_t *bucket, + apr_size_t requested, + int vecs_size, struct iovec *vecs, + int *vecs_used) +{ + barrier_context_t *ctx = bucket->data; + + return serf_bucket_read_iovec(ctx->stream, requested, vecs_size, vecs, + vecs_used); +} + +static apr_status_t serf_barrier_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + barrier_context_t *ctx = bucket->data; + + return serf_bucket_readline(ctx->stream, acceptable, found, data, len); +} + +static apr_status_t serf_barrier_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + barrier_context_t *ctx = bucket->data; + + return serf_bucket_peek(ctx->stream, data, len); +} + +static void serf_barrier_destroy(serf_bucket_t *bucket) +{ + /* The intent of this bucket is not to let our wrapped buckets be + * destroyed. */ + + /* The option is for us to go ahead and 'eat' this bucket now, + * or just ignore the deletion entirely. + */ + serf_default_destroy_and_data(bucket); +} + +const serf_bucket_type_t serf_bucket_type_barrier = { + "BARRIER", + serf_barrier_read, + serf_barrier_readline, + serf_barrier_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_barrier_peek, + serf_barrier_destroy, +}; diff --git a/buckets/buckets.c b/buckets/buckets.c new file mode 100644 index 0000000..29175ff --- /dev/null +++ b/buckets/buckets.c @@ -0,0 +1,538 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "serf.h" +#include "serf_bucket_util.h" + + +serf_bucket_t *serf_bucket_create( + const serf_bucket_type_t *type, + serf_bucket_alloc_t *allocator, + void *data) +{ + serf_bucket_t *bkt = serf_bucket_mem_alloc(allocator, sizeof(*bkt)); + + bkt->type = type; + bkt->data = data; + bkt->allocator = allocator; + + return bkt; +} + + +apr_status_t serf_default_read_iovec( + serf_bucket_t *bucket, + apr_size_t requested, + int vecs_size, + struct iovec *vecs, + int *vecs_used) +{ + const char *data; + apr_size_t len; + + /* Read some data from the bucket. + * + * Because we're an internal 'helper' to the bucket, we can't call the + * normal serf_bucket_read() call because the debug allocator tracker will + * end up marking the bucket as read *twice* - once for us and once for + * our caller - which is reading the same bucket. This leads to premature + * abort()s if we ever see EAGAIN. Instead, we'll go directly to the + * vtable and bypass the debug tracker. + */ + apr_status_t status = bucket->type->read(bucket, requested, &data, &len); + + /* assert that vecs_size >= 1 ? */ + + /* Return that data as a single iovec. */ + if (len) { + vecs[0].iov_base = (void *)data; /* loses the 'const' */ + vecs[0].iov_len = len; + *vecs_used = 1; + } + else { + *vecs_used = 0; + } + + return status; +} + + +apr_status_t serf_default_read_for_sendfile( + serf_bucket_t *bucket, + apr_size_t requested, + apr_hdtr_t *hdtr, + apr_file_t **file, + apr_off_t *offset, + apr_size_t *len) +{ + /* Read a bunch of stuff into the headers. + * + * See serf_default_read_iovec as to why we call into the vtable + * directly. + */ + apr_status_t status = bucket->type->read_iovec(bucket, requested, + hdtr->numheaders, + hdtr->headers, + &hdtr->numheaders); + + /* There isn't a file, and there are no trailers. */ + *file = NULL; + hdtr->numtrailers = 0; + + return status; +} + + +serf_bucket_t *serf_default_read_bucket( + serf_bucket_t *bucket, + const serf_bucket_type_t *type) +{ + return NULL; +} + + +void serf_default_destroy(serf_bucket_t *bucket) +{ +#ifdef SERF_DEBUG_BUCKET_USE + serf_debug__bucket_destroy(bucket); +#endif + + serf_bucket_mem_free(bucket->allocator, bucket); +} + + +void serf_default_destroy_and_data(serf_bucket_t *bucket) +{ + serf_bucket_mem_free(bucket->allocator, bucket->data); + serf_default_destroy(bucket); +} + + +/* ==================================================================== */ + + +char *serf_bstrmemdup(serf_bucket_alloc_t *allocator, + const char *str, + apr_size_t size) +{ + char *newstr = serf_bucket_mem_alloc(allocator, size + 1); + memcpy(newstr, str, size); + newstr[size] = '\0'; + return newstr; +} + + +void *serf_bmemdup(serf_bucket_alloc_t *allocator, + const void *mem, + apr_size_t size) +{ + void *newmem = serf_bucket_mem_alloc(allocator, size); + memcpy(newmem, mem, size); + return newmem; +} + + +char *serf_bstrdup(serf_bucket_alloc_t *allocator, + const char *str) +{ + apr_size_t size = strlen(str) + 1; + char *newstr = serf_bucket_mem_alloc(allocator, size); + memcpy(newstr, str, size); + return newstr; +} + + +/* ==================================================================== */ + + +static void find_crlf(const char **data, apr_size_t *len, int *found) +{ + const char *start = *data; + const char *end = start + *len; + + while (start < end) { + const char *cr = memchr(start, '\r', *len); + + if (cr == NULL) { + break; + } + ++cr; + + if (cr < end && cr[0] == '\n') { + *len -= cr + 1 - start; + *data = cr + 1; + *found = SERF_NEWLINE_CRLF; + return; + } + if (cr == end) { + *len = 0; + *data = end; + *found = SERF_NEWLINE_CRLF_SPLIT; + return; + } + + /* It was a bare CR without an LF. Just move past it. */ + *len -= cr - start; + start = cr; + } + + *data = start + *len; + *len -= *data - start; + *found = SERF_NEWLINE_NONE; +} + + +void serf_util_readline( + const char **data, + apr_size_t *len, + int acceptable, + int *found) +{ + const char *start; + const char *cr; + const char *lf; + int want_cr; + int want_crlf; + int want_lf; + + /* If _only_ CRLF is acceptable, then the scanning needs a loop to + * skip false hits on CR characters. Use a separate function. + */ + if (acceptable == SERF_NEWLINE_CRLF) { + find_crlf(data, len, found); + return; + } + + start = *data; + cr = lf = NULL; + want_cr = acceptable & SERF_NEWLINE_CR; + want_crlf = acceptable & SERF_NEWLINE_CRLF; + want_lf = acceptable & SERF_NEWLINE_LF; + + if (want_cr || want_crlf) { + cr = memchr(start, '\r', *len); + } + if (want_lf) { + lf = memchr(start, '\n', *len); + } + + if (cr != NULL) { + if (lf != NULL) { + if (cr + 1 == lf) + *found = want_crlf ? SERF_NEWLINE_CRLF : SERF_NEWLINE_CR; + else if (want_cr && cr < lf) + *found = SERF_NEWLINE_CR; + else + *found = SERF_NEWLINE_LF; + } + else if (cr == start + *len - 1) { + /* the CR occurred in the last byte of the buffer. this could be + * a CRLF split across the data boundary. + * ### FIX THIS LOGIC? does caller need to detect? + */ + *found = want_crlf ? SERF_NEWLINE_CRLF_SPLIT : SERF_NEWLINE_CR; + } + else if (want_cr) + *found = SERF_NEWLINE_CR; + else /* want_crlf */ + *found = SERF_NEWLINE_NONE; + } + else if (lf != NULL) + *found = SERF_NEWLINE_LF; + else + *found = SERF_NEWLINE_NONE; + + switch (*found) { + case SERF_NEWLINE_LF: + *data = lf + 1; + break; + case SERF_NEWLINE_CR: + case SERF_NEWLINE_CRLF: + case SERF_NEWLINE_CRLF_SPLIT: + *data = cr + 1 + (*found == SERF_NEWLINE_CRLF); + break; + case SERF_NEWLINE_NONE: + *data += *len; + break; + default: + /* Not reachable */ + return; + } + + *len -= *data - start; +} + + +/* ==================================================================== */ + + +void serf_databuf_init(serf_databuf_t *databuf) +{ + /* nothing is sitting in the buffer */ + databuf->remaining = 0; + + /* avoid thinking we have hit EOF */ + databuf->status = APR_SUCCESS; +} + +/* Ensure the buffer is prepared for reading. Will return APR_SUCCESS, + * APR_EOF, or some failure code. *len is only set for EOF. */ +static apr_status_t common_databuf_prep(serf_databuf_t *databuf, + apr_size_t *len) +{ + apr_size_t readlen; + apr_status_t status; + + /* if there is data in the buffer, then we're happy. */ + if (databuf->remaining > 0) + return APR_SUCCESS; + + /* if we already hit EOF, then keep returning that. */ + if (APR_STATUS_IS_EOF(databuf->status)) { + /* *data = NULL; ?? */ + *len = 0; + return APR_EOF; + } + + /* refill the buffer */ + status = (*databuf->read)(databuf->read_baton, sizeof(databuf->buf), + databuf->buf, &readlen); + if (SERF_BUCKET_READ_ERROR(status)) { + return status; + } + + databuf->current = databuf->buf; + databuf->remaining = readlen; + databuf->status = status; + + return APR_SUCCESS; +} + + +apr_status_t serf_databuf_read( + serf_databuf_t *databuf, + apr_size_t requested, + const char **data, + apr_size_t *len) +{ + apr_status_t status = common_databuf_prep(databuf, len); + if (status) + return status; + + /* peg the requested amount to what we have remaining */ + if (requested == SERF_READ_ALL_AVAIL || requested > databuf->remaining) + requested = databuf->remaining; + + /* return the values */ + *data = databuf->current; + *len = requested; + + /* adjust our internal state to note we've consumed some data */ + databuf->current += requested; + databuf->remaining -= requested; + + /* If we read everything, then we need to return whatever the data + * read returned to us. This is going to be APR_EOF or APR_EGAIN. + * If we have NOT read everything, then return APR_SUCCESS to indicate + * that we're ready to return some more if asked. + */ + return databuf->remaining ? APR_SUCCESS : databuf->status; +} + + +apr_status_t serf_databuf_readline( + serf_databuf_t *databuf, + int acceptable, + int *found, + const char **data, + apr_size_t *len) +{ + apr_status_t status = common_databuf_prep(databuf, len); + if (status) + return status; + + /* the returned line will start at the current position. */ + *data = databuf->current; + + /* read a line from the buffer, and adjust the various pointers. */ + serf_util_readline(&databuf->current, &databuf->remaining, acceptable, + found); + + /* the length matches the amount consumed by the readline */ + *len = databuf->current - *data; + + /* see serf_databuf_read's return condition */ + return databuf->remaining ? APR_SUCCESS : databuf->status; +} + + +apr_status_t serf_databuf_peek( + serf_databuf_t *databuf, + const char **data, + apr_size_t *len) +{ + apr_status_t status = common_databuf_prep(databuf, len); + if (status) + return status; + + /* return everything we have */ + *data = databuf->current; + *len = databuf->remaining; + + /* If the last read returned EOF, then the peek should return the same. + * The other possibility in databuf->status is APR_EAGAIN, which we + * should never return. Thus, just return APR_SUCCESS for non-EOF cases. + */ + if (APR_STATUS_IS_EOF(databuf->status)) + return APR_EOF; + return APR_SUCCESS; +} + + +/* ==================================================================== */ + + +void serf_linebuf_init(serf_linebuf_t *linebuf) +{ + linebuf->state = SERF_LINEBUF_EMPTY; + linebuf->used = 0; +} + + +apr_status_t serf_linebuf_fetch( + serf_linebuf_t *linebuf, + serf_bucket_t *bucket, + int acceptable) +{ + /* If we had a complete line, then assume the caller has used it, so + * we can now reset the state. + */ + if (linebuf->state == SERF_LINEBUF_READY) { + linebuf->state = SERF_LINEBUF_EMPTY; + + /* Reset the line_used, too, so we don't have to test the state + * before using this value. + */ + linebuf->used = 0; + } + + while (1) { + apr_status_t status; + const char *data; + apr_size_t len; + + if (linebuf->state == SERF_LINEBUF_CRLF_SPLIT) { + /* On the previous read, we received just a CR. The LF might + * be present, but the bucket couldn't see it. We need to + * examine a single character to determine how to handle the + * split CRLF. + */ + + status = serf_bucket_peek(bucket, &data, &len); + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + if (len > 0) { + if (*data == '\n') { + /* We saw the second part of CRLF. We don't need to + * save that character, so do an actual read to suck + * up that character. + */ + /* ### check status */ + (void) serf_bucket_read(bucket, 1, &data, &len); + } + /* else: + * We saw the first character of the next line. Thus, + * the current line is terminated by the CR. Just + * ignore whatever we peeked at. The next reader will + * see it and handle it as appropriate. + */ + + /* Whatever was read, the line is now ready for use. */ + linebuf->state = SERF_LINEBUF_READY; + } + /* ### we need data. gotta check this char. bail if zero?! */ + /* else len == 0 */ + + /* ### status */ + } + else { + int found; + + status = serf_bucket_readline(bucket, acceptable, &found, + &data, &len); + if (SERF_BUCKET_READ_ERROR(status)) { + return status; + } + /* Some bucket types (socket) might need an extra read to find + out EOF state, so they'll return no data in that read. This + means we're done reading, return what we got. */ + if (APR_STATUS_IS_EOF(status) && len == 0) { + return status; + } + if (linebuf->used + len > sizeof(linebuf->line)) { + /* ### need a "line too long" error */ + return APR_EGENERAL; + } + + /* Note: our logic doesn't change for SERF_LINEBUF_PARTIAL. That + * only affects how we fill the buffer. It is a communication to + * our caller on whether the line is ready or not. + */ + + /* If we didn't see a newline, then we should mark the line + * buffer as partially complete. + */ + if (found == SERF_NEWLINE_NONE) { + linebuf->state = SERF_LINEBUF_PARTIAL; + } + else if (found == SERF_NEWLINE_CRLF_SPLIT) { + linebuf->state = SERF_LINEBUF_CRLF_SPLIT; + + /* Toss the partial CR. We won't ever need it. */ + --len; + } + else { + /* We got a newline (of some form). We don't need it + * in the line buffer, so back up the length. Then + * mark the line as ready. + */ + len -= 1 + (found == SERF_NEWLINE_CRLF); + + linebuf->state = SERF_LINEBUF_READY; + } + + /* ### it would be nice to avoid this copy if at all possible, + ### and just return the a data/len pair to the caller. we're + ### keeping it simple for now. */ + memcpy(&linebuf->line[linebuf->used], data, len); + linebuf->used += len; + } + + /* If we saw anything besides "success. please read again", then + * we should return that status. If the line was completed, then + * we should also return. + */ + if (status || linebuf->state == SERF_LINEBUF_READY) + return status; + + /* We got APR_SUCCESS and the line buffer is not complete. Let's + * loop to read some more data. + */ + } + /* NOTREACHED */ +} diff --git a/buckets/bwtp_buckets.c b/buckets/bwtp_buckets.c new file mode 100644 index 0000000..7ef3047 --- /dev/null +++ b/buckets/bwtp_buckets.c @@ -0,0 +1,596 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include "serf.h" +#include "serf_bucket_util.h" +#include "serf_bucket_types.h" + +#include + +/* This is an implementation of Bidirectional Web Transfer Protocol (BWTP) + * See: + * http://bwtp.wikidot.com/ + */ + +typedef struct { + int channel; + int open; + int type; /* 0 = header, 1 = message */ /* TODO enum? */ + const char *phrase; + serf_bucket_t *headers; + + char req_line[1000]; +} frame_context_t; + +typedef struct { + serf_bucket_t *stream; + serf_bucket_t *body; /* Pointer to the stream wrapping the body. */ + serf_bucket_t *headers; /* holds parsed headers */ + + enum { + STATE_STATUS_LINE, /* reading status line */ + STATE_HEADERS, /* reading headers */ + STATE_BODY, /* reading body */ + STATE_DONE /* we've sent EOF */ + } state; + + /* Buffer for accumulating a line from the response. */ + serf_linebuf_t linebuf; + + int type; /* 0 = header, 1 = message */ /* TODO enum? */ + int channel; + char *phrase; + apr_size_t length; +} incoming_context_t; + + +serf_bucket_t *serf_bucket_bwtp_channel_close( + int channel, + serf_bucket_alloc_t *allocator) +{ + frame_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->type = 0; + ctx->open = 0; + ctx->channel = channel; + ctx->phrase = "CLOSED"; + ctx->headers = serf_bucket_headers_create(allocator); + + return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); +} + +serf_bucket_t *serf_bucket_bwtp_channel_open( + int channel, + const char *uri, + serf_bucket_alloc_t *allocator) +{ + frame_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->type = 0; + ctx->open = 1; + ctx->channel = channel; + ctx->phrase = uri; + ctx->headers = serf_bucket_headers_create(allocator); + + return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); +} + +serf_bucket_t *serf_bucket_bwtp_header_create( + int channel, + const char *phrase, + serf_bucket_alloc_t *allocator) +{ + frame_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->type = 0; + ctx->open = 0; + ctx->channel = channel; + ctx->phrase = phrase; + ctx->headers = serf_bucket_headers_create(allocator); + + return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); +} + +serf_bucket_t *serf_bucket_bwtp_message_create( + int channel, + serf_bucket_t *body, + serf_bucket_alloc_t *allocator) +{ + frame_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->type = 1; + ctx->open = 0; + ctx->channel = channel; + ctx->phrase = "MESSAGE"; + ctx->headers = serf_bucket_headers_create(allocator); + + return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); +} + +int serf_bucket_bwtp_frame_get_channel( + serf_bucket_t *bucket) +{ + if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { + frame_context_t *ctx = bucket->data; + + return ctx->channel; + } + else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { + incoming_context_t *ctx = bucket->data; + + return ctx->channel; + } + + return -1; +} + +int serf_bucket_bwtp_frame_get_type( + serf_bucket_t *bucket) +{ + if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { + frame_context_t *ctx = bucket->data; + + return ctx->type; + } + else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { + incoming_context_t *ctx = bucket->data; + + return ctx->type; + } + + return -1; +} + +const char *serf_bucket_bwtp_frame_get_phrase( + serf_bucket_t *bucket) +{ + if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { + frame_context_t *ctx = bucket->data; + + return ctx->phrase; + } + else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { + incoming_context_t *ctx = bucket->data; + + return ctx->phrase; + } + + return NULL; +} + +serf_bucket_t *serf_bucket_bwtp_frame_get_headers( + serf_bucket_t *bucket) +{ + if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { + frame_context_t *ctx = bucket->data; + + return ctx->headers; + } + else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { + incoming_context_t *ctx = bucket->data; + + return ctx->headers; + } + + return NULL; +} + +static int count_size(void *baton, const char *key, const char *value) +{ + apr_size_t *c = baton; + /* TODO Deal with folding. Yikes. */ + + /* Add in ": " and CRLF - so an extra four bytes. */ + *c += strlen(key) + strlen(value) + 4; + + return 0; +} + +static apr_size_t calc_header_size(serf_bucket_t *hdrs) +{ + apr_size_t size = 0; + + serf_bucket_headers_do(hdrs, count_size, &size); + + return size; +} + +static void serialize_data(serf_bucket_t *bucket) +{ + frame_context_t *ctx = bucket->data; + serf_bucket_t *new_bucket; + apr_size_t req_len; + + /* Serialize the request-line and headers into one mother string, + * and wrap a bucket around it. + */ + req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line), + "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n", + (ctx->type ? "BWM" : "BWH"), + ctx->channel, calc_header_size(ctx->headers), + (ctx->open ? "OPEN " : ""), + ctx->phrase); + new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len, + bucket->allocator); + + /* Build up the new bucket structure. + * + * Note that self needs to become an aggregate bucket so that a + * pointer to self still represents the "right" data. + */ + serf_bucket_aggregate_become(bucket); + + /* Insert the two buckets. */ + serf_bucket_aggregate_append(bucket, new_bucket); + serf_bucket_aggregate_append(bucket, ctx->headers); + + /* Our private context is no longer needed, and is not referred to by + * any existing bucket. Toss it. + */ + serf_bucket_mem_free(bucket->allocator, ctx); +} + +static apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + /* Seralize our private data into a new aggregate bucket. */ + serialize_data(bucket); + + /* Delegate to the "new" aggregate bucket to do the read. */ + return serf_bucket_read(bucket, requested, data, len); +} + +static apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + /* Seralize our private data into a new aggregate bucket. */ + serialize_data(bucket); + + /* Delegate to the "new" aggregate bucket to do the readline. */ + return serf_bucket_readline(bucket, acceptable, found, data, len); +} + +static apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket, + apr_size_t requested, + int vecs_size, + struct iovec *vecs, + int *vecs_used) +{ + /* Seralize our private data into a new aggregate bucket. */ + serialize_data(bucket); + + /* Delegate to the "new" aggregate bucket to do the read. */ + return serf_bucket_read_iovec(bucket, requested, + vecs_size, vecs, vecs_used); +} + +static apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + /* Seralize our private data into a new aggregate bucket. */ + serialize_data(bucket); + + /* Delegate to the "new" aggregate bucket to do the peek. */ + return serf_bucket_peek(bucket, data, len); +} + +const serf_bucket_type_t serf_bucket_type_bwtp_frame = { + "BWTP-FRAME", + serf_bwtp_frame_read, + serf_bwtp_frame_readline, + serf_bwtp_frame_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_bwtp_frame_peek, + serf_default_destroy_and_data, +}; + + +serf_bucket_t *serf_bucket_bwtp_incoming_frame_create( + serf_bucket_t *stream, + serf_bucket_alloc_t *allocator) +{ + incoming_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->stream = stream; + ctx->body = NULL; + ctx->headers = serf_bucket_headers_create(allocator); + ctx->state = STATE_STATUS_LINE; + ctx->length = 0; + ctx->channel = -1; + ctx->phrase = NULL; + + serf_linebuf_init(&ctx->linebuf); + + return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx); +} + +static void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket) +{ + incoming_context_t *ctx = bucket->data; + + if (ctx->state != STATE_STATUS_LINE && ctx->phrase) { + serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase); + } + + serf_bucket_destroy(ctx->stream); + if (ctx->body != NULL) + serf_bucket_destroy(ctx->body); + serf_bucket_destroy(ctx->headers); + + serf_default_destroy_and_data(bucket); +} + +static apr_status_t fetch_line(incoming_context_t *ctx, int acceptable) +{ + return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable); +} + +static apr_status_t parse_status_line(incoming_context_t *ctx, + serf_bucket_alloc_t *allocator) +{ + int res; + char *reason; /* ### stupid APR interface makes this non-const */ + + /* ctx->linebuf.line should be of form: BW* */ + res = apr_date_checkmask(ctx->linebuf.line, "BW*"); + if (!res) { + /* Not an BWTP response? Well, at least we won't understand it. */ + return APR_EGENERAL; + } + + if (ctx->linebuf.line[2] == 'H') { + ctx->type = 0; + } + else if (ctx->linebuf.line[2] == 'M') { + ctx->type = 1; + } + else { + ctx->type = -1; + } + + ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16); + + /* Skip leading spaces for the reason string. */ + if (apr_isspace(*reason)) { + reason++; + } + + ctx->length = apr_strtoi64(reason, &reason, 16); + + /* Skip leading spaces for the reason string. */ + if (reason - ctx->linebuf.line < ctx->linebuf.used) { + if (apr_isspace(*reason)) { + reason++; + } + + ctx->phrase = serf_bstrmemdup(allocator, reason, + ctx->linebuf.used + - (reason - ctx->linebuf.line)); + } else { + ctx->phrase = NULL; + } + + return APR_SUCCESS; +} + +/* This code should be replaced with header buckets. */ +static apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx) +{ + apr_status_t status; + + /* RFC 2616 says that CRLF is the only line ending, but we can easily + * accept any kind of line ending. + */ + status = fetch_line(ctx, SERF_NEWLINE_ANY); + if (SERF_BUCKET_READ_ERROR(status)) { + return status; + } + /* Something was read. Process it. */ + + if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) { + const char *end_key; + const char *c; + + end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used); + if (!c) { + /* Bad headers? */ + return APR_EGENERAL; + } + + /* Skip over initial : and spaces. */ + while (apr_isspace(*++c)) + continue; + + /* Always copy the headers (from the linebuf into new mem). */ + /* ### we should be able to optimize some mem copies */ + serf_bucket_headers_setx( + ctx->headers, + ctx->linebuf.line, end_key - ctx->linebuf.line, 1, + c, ctx->linebuf.line + ctx->linebuf.used - c, 1); + } + + return status; +} + +/* Perform one iteration of the state machine. + * + * Will return when one the following conditions occurred: + * 1) a state change + * 2) an error + * 3) the stream is not ready or at EOF + * 4) APR_SUCCESS, meaning the machine can be run again immediately + */ +static apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx) +{ + apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */ + + switch (ctx->state) { + case STATE_STATUS_LINE: + /* RFC 2616 says that CRLF is the only line ending, but we can easily + * accept any kind of line ending. + */ + status = fetch_line(ctx, SERF_NEWLINE_ANY); + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) { + /* The Status-Line is in the line buffer. Process it. */ + status = parse_status_line(ctx, bkt->allocator); + if (status) + return status; + + if (ctx->length) { + ctx->body = + serf_bucket_barrier_create(ctx->stream, bkt->allocator); + ctx->body = serf_bucket_limit_create(ctx->body, ctx->length, + bkt->allocator); + if (!ctx->type) { + ctx->state = STATE_HEADERS; + } else { + ctx->state = STATE_BODY; + } + } else { + ctx->state = STATE_DONE; + } + } + else { + /* The connection closed before we could get the next + * response. Treat the request as lost so that our upper + * end knows the server never tried to give us a response. + */ + if (APR_STATUS_IS_EOF(status)) { + return SERF_ERROR_REQUEST_LOST; + } + } + break; + case STATE_HEADERS: + status = fetch_headers(ctx->body, ctx); + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + /* If an empty line was read, then we hit the end of the headers. + * Move on to the body. + */ + if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) { + /* Advance the state. */ + ctx->state = STATE_DONE; + } + break; + case STATE_BODY: + /* Don't do anything. */ + break; + case STATE_DONE: + return APR_EOF; + default: + /* Not reachable */ + return APR_EGENERAL; + } + + return status; +} + +static apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx) +{ + apr_status_t status; + + /* Keep reading and moving through states if we aren't at the BODY */ + while (ctx->state != STATE_BODY) { + status = run_machine(bkt, ctx); + + /* Anything other than APR_SUCCESS means that we cannot immediately + * read again (for now). + */ + if (status) + return status; + } + /* in STATE_BODY */ + + return APR_SUCCESS; +} + +apr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers( + serf_bucket_t *bucket) +{ + incoming_context_t *ctx = bucket->data; + + return wait_for_body(bucket, ctx); +} + +static apr_status_t bwtp_incoming_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + incoming_context_t *ctx = bucket->data; + apr_status_t rv; + + rv = wait_for_body(bucket, ctx); + if (rv) { + /* It's not possible to have read anything yet! */ + if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) { + *len = 0; + } + return rv; + } + + rv = serf_bucket_read(ctx->body, requested, data, len); + if (APR_STATUS_IS_EOF(rv)) { + ctx->state = STATE_DONE; + } + return rv; +} + +static apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + incoming_context_t *ctx = bucket->data; + apr_status_t rv; + + rv = wait_for_body(bucket, ctx); + if (rv) { + return rv; + } + + /* Delegate to the stream bucket to do the readline. */ + return serf_bucket_readline(ctx->body, acceptable, found, data, len); +} + +/* ### need to implement */ +#define bwtp_incoming_peek NULL + +const serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = { + "BWTP-INCOMING", + bwtp_incoming_read, + bwtp_incoming_readline, + serf_default_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + bwtp_incoming_peek, + bwtp_incoming_destroy_and_data, +}; diff --git a/buckets/chunk_buckets.c b/buckets/chunk_buckets.c new file mode 100644 index 0000000..7f25508 --- /dev/null +++ b/buckets/chunk_buckets.c @@ -0,0 +1,235 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "serf.h" +#include "serf_bucket_util.h" + + +typedef struct { + enum { + STATE_FETCH, + STATE_CHUNK, + STATE_EOF + } state; + + apr_status_t last_status; + + serf_bucket_t *chunk; + serf_bucket_t *stream; + + char chunk_hdr[20]; +} chunk_context_t; + + +serf_bucket_t *serf_bucket_chunk_create( + serf_bucket_t *stream, serf_bucket_alloc_t *allocator) +{ + chunk_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->state = STATE_FETCH; + ctx->chunk = serf_bucket_aggregate_create(allocator); + ctx->stream = stream; + + return serf_bucket_create(&serf_bucket_type_chunk, allocator, ctx); +} + +#define CRLF "\r\n" + +static apr_status_t create_chunk(serf_bucket_t *bucket) +{ + chunk_context_t *ctx = bucket->data; + serf_bucket_t *simple_bkt; + apr_size_t chunk_len; + apr_size_t stream_len; + struct iovec vecs[66]; /* 64 + chunk trailer + EOF trailer = 66 */ + int vecs_read; + int i; + + if (ctx->state != STATE_FETCH) { + return APR_SUCCESS; + } + + ctx->last_status = + serf_bucket_read_iovec(ctx->stream, SERF_READ_ALL_AVAIL, + 64, vecs, &vecs_read); + + if (SERF_BUCKET_READ_ERROR(ctx->last_status)) { + /* Uh-oh. */ + return ctx->last_status; + } + + /* Count the length of the data we read. */ + stream_len = 0; + for (i = 0; i < vecs_read; i++) { + stream_len += vecs[i].iov_len; + } + + /* assert: stream_len in hex < sizeof(ctx->chunk_hdr) */ + + /* Inserting a 0 byte chunk indicates a terminator, which already happens + * during the EOF handler below. Adding another one here will cause the + * EOF chunk to be interpreted by the server as a new request. So, + * we'll only do this if we have something to write. + */ + if (stream_len) { + /* Build the chunk header. */ + chunk_len = apr_snprintf(ctx->chunk_hdr, sizeof(ctx->chunk_hdr), + "%" APR_UINT64_T_HEX_FMT CRLF, + (apr_uint64_t)stream_len); + + /* Create a copy of the chunk header so we can have multiple chunks + * in the pipeline at the same time. + */ + simple_bkt = serf_bucket_simple_copy_create(ctx->chunk_hdr, chunk_len, + bucket->allocator); + serf_bucket_aggregate_append(ctx->chunk, simple_bkt); + + /* Insert the chunk footer. */ + vecs[vecs_read].iov_base = CRLF; + vecs[vecs_read++].iov_len = sizeof(CRLF) - 1; + } + + /* We've reached the end of the line for the stream. */ + if (APR_STATUS_IS_EOF(ctx->last_status)) { + /* Insert the chunk footer. */ + vecs[vecs_read].iov_base = "0" CRLF CRLF; + vecs[vecs_read++].iov_len = sizeof("0" CRLF CRLF) - 1; + + ctx->state = STATE_EOF; + } + else { + /* Okay, we can return data. */ + ctx->state = STATE_CHUNK; + } + + serf_bucket_aggregate_append_iovec(ctx->chunk, vecs, vecs_read); + + return APR_SUCCESS; +} + +static apr_status_t serf_chunk_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + chunk_context_t *ctx = bucket->data; + apr_status_t status; + + /* Before proceeding, we need to fetch some data from the stream. */ + if (ctx->state == STATE_FETCH) { + status = create_chunk(bucket); + if (status) { + return status; + } + } + + status = serf_bucket_read(ctx->chunk, requested, data, len); + + /* Mask EOF from aggregate bucket. */ + if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) { + status = ctx->last_status; + ctx->state = STATE_FETCH; + } + + return status; +} + +static apr_status_t serf_chunk_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + chunk_context_t *ctx = bucket->data; + apr_status_t status; + + status = serf_bucket_readline(ctx->chunk, acceptable, found, data, len); + + /* Mask EOF from aggregate bucket. */ + if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) { + status = APR_EAGAIN; + ctx->state = STATE_FETCH; + } + + return status; +} + +static apr_status_t serf_chunk_read_iovec(serf_bucket_t *bucket, + apr_size_t requested, + int vecs_size, + struct iovec *vecs, + int *vecs_used) +{ + chunk_context_t *ctx = bucket->data; + apr_status_t status; + + /* Before proceeding, we need to fetch some data from the stream. */ + if (ctx->state == STATE_FETCH) { + status = create_chunk(bucket); + if (status) { + return status; + } + } + + status = serf_bucket_read_iovec(ctx->chunk, requested, vecs_size, vecs, + vecs_used); + + /* Mask EOF from aggregate bucket. */ + if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) { + status = ctx->last_status; + ctx->state = STATE_FETCH; + } + + return status; +} + +static apr_status_t serf_chunk_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + chunk_context_t *ctx = bucket->data; + apr_status_t status; + + status = serf_bucket_peek(ctx->chunk, data, len); + + /* Mask EOF from aggregate bucket. */ + if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) { + status = APR_EAGAIN; + } + + return status; +} + +static void serf_chunk_destroy(serf_bucket_t *bucket) +{ + chunk_context_t *ctx = bucket->data; + + serf_bucket_destroy(ctx->stream); + serf_bucket_destroy(ctx->chunk); + + serf_default_destroy_and_data(bucket); +} + +const serf_bucket_type_t serf_bucket_type_chunk = { + "CHUNK", + serf_chunk_read, + serf_chunk_readline, + serf_chunk_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_chunk_peek, + serf_chunk_destroy, +}; diff --git a/buckets/dechunk_buckets.c b/buckets/dechunk_buckets.c new file mode 100644 index 0000000..28dd671 --- /dev/null +++ b/buckets/dechunk_buckets.c @@ -0,0 +1,185 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "serf.h" +#include "serf_bucket_util.h" + +typedef struct { + serf_bucket_t *stream; + + enum { + STATE_SIZE, /* reading the chunk size */ + STATE_CHUNK, /* reading the chunk */ + STATE_TERM, /* reading the chunk terminator */ + STATE_DONE /* body is done; we've returned EOF */ + } state; + + /* Buffer for accumulating a chunk size. */ + serf_linebuf_t linebuf; + + /* How much of the chunk, or the terminator, do we have left to read? */ + apr_int64_t body_left; + +} dechunk_context_t; + + +serf_bucket_t *serf_bucket_dechunk_create( + serf_bucket_t *stream, + serf_bucket_alloc_t *allocator) +{ + dechunk_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->stream = stream; + ctx->state = STATE_SIZE; + + serf_linebuf_init(&ctx->linebuf); + + return serf_bucket_create(&serf_bucket_type_dechunk, allocator, ctx); +} + +static void serf_dechunk_destroy_and_data(serf_bucket_t *bucket) +{ + dechunk_context_t *ctx = bucket->data; + + serf_bucket_destroy(ctx->stream); + + serf_default_destroy_and_data(bucket); +} + +static apr_status_t serf_dechunk_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + dechunk_context_t *ctx = bucket->data; + apr_status_t status; + + while (1) { + switch (ctx->state) { + case STATE_SIZE: + + /* fetch a line terminated by CRLF */ + status = serf_linebuf_fetch(&ctx->linebuf, ctx->stream, + SERF_NEWLINE_CRLF); + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + /* if a line was read, then parse it. */ + if (ctx->linebuf.state == SERF_LINEBUF_READY) { + /* NUL-terminate the line. if it filled the entire buffer, + then just assume the thing is too large. */ + if (ctx->linebuf.used == sizeof(ctx->linebuf.line)) + return APR_FROM_OS_ERROR(ERANGE); + ctx->linebuf.line[ctx->linebuf.used] = '\0'; + + /* convert from HEX digits. */ + ctx->body_left = apr_strtoi64(ctx->linebuf.line, NULL, 16); + if (errno == ERANGE) { + return APR_FROM_OS_ERROR(ERANGE); + } + + if (ctx->body_left == 0) { + /* Just read the last-chunk marker. We're DONE. */ + ctx->state = STATE_DONE; + status = APR_EOF; + } + else { + /* Got a size, so we'll start reading the chunk now. */ + ctx->state = STATE_CHUNK; + } + + /* If we can read more, then go do so. */ + if (!status) + continue; + } + /* assert: status != 0 */ + + /* Note that we didn't actually read anything, so our callers + * don't get confused. + */ + *len = 0; + + return status; + + case STATE_CHUNK: + + if (requested > ctx->body_left) { + requested = ctx->body_left; + } + + /* Delegate to the stream bucket to do the read. */ + status = serf_bucket_read(ctx->stream, requested, data, len); + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + /* Some data was read, so decrement the amount left and see + * if we're done reading this chunk. + */ + ctx->body_left -= *len; + if (!ctx->body_left) { + ctx->state = STATE_TERM; + ctx->body_left = 2; /* CRLF */ + } + + /* Return the data we just read. */ + return status; + + case STATE_TERM: + /* Delegate to the stream bucket to do the read. */ + status = serf_bucket_read(ctx->stream, ctx->body_left, data, len); + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + /* Some data was read, so decrement the amount left and see + * if we're done reading the chunk terminator. + */ + ctx->body_left -= *len; + if (!ctx->body_left) { + ctx->state = STATE_SIZE; + } + + if (status) + return status; + + break; + + case STATE_DONE: + /* Just keep returning EOF */ + return APR_EOF; + + default: + /* Not reachable */ + return APR_EGENERAL; + } + } + /* NOTREACHED */ +} + +/* ### need to implement */ +#define serf_dechunk_readline NULL +#define serf_dechunk_peek NULL + +const serf_bucket_type_t serf_bucket_type_dechunk = { + "DECHUNK", + serf_dechunk_read, + serf_dechunk_readline, + serf_default_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_dechunk_peek, + serf_dechunk_destroy_and_data, +}; diff --git a/buckets/deflate_buckets.c b/buckets/deflate_buckets.c new file mode 100644 index 0000000..7a8e8e4 --- /dev/null +++ b/buckets/deflate_buckets.c @@ -0,0 +1,384 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +/* This conditional isn't defined anywhere yet. */ +#ifdef HAVE_ZUTIL_H +#include +#endif + +#include "serf.h" +#include "serf_bucket_util.h" + +/* magic header */ +static char deflate_magic[2] = { '\037', '\213' }; +#define DEFLATE_MAGIC_SIZE 10 +#define DEFLATE_VERIFY_SIZE 8 +#define DEFLATE_BUFFER_SIZE 8096 + +static const int DEFLATE_WINDOW_SIZE = -15; +static const int DEFLATE_MEMLEVEL = 9; + +typedef struct { + serf_bucket_t *stream; + serf_bucket_t *inflate_stream; + + int format; /* Are we 'deflate' or 'gzip'? */ + + enum { + STATE_READING_HEADER, /* reading the gzip header */ + STATE_HEADER, /* read the gzip header */ + STATE_INIT, /* init'ing zlib functions */ + STATE_INFLATE, /* inflating the content now */ + STATE_READING_VERIFY, /* reading the final gzip CRC */ + STATE_VERIFY, /* verifying the final gzip CRC */ + STATE_FINISH, /* clean up after reading body */ + STATE_DONE, /* body is done; we'll return EOF here */ + } state; + + z_stream zstream; + char hdr_buffer[DEFLATE_MAGIC_SIZE]; + unsigned char buffer[DEFLATE_BUFFER_SIZE]; + unsigned long crc; + int windowSize; + int memLevel; + int bufferSize; + + /* How much of the chunk, or the terminator, do we have left to read? */ + apr_size_t stream_left; + + /* How much are we supposed to read? */ + apr_size_t stream_size; + + int stream_status; /* What was the last status we read? */ + +} deflate_context_t; + +/* Inputs a string and returns a long. */ +static unsigned long getLong(unsigned char *string) +{ + return ((unsigned long)string[0]) + | (((unsigned long)string[1]) << 8) + | (((unsigned long)string[2]) << 16) + | (((unsigned long)string[3]) << 24); +} + +serf_bucket_t *serf_bucket_deflate_create( + serf_bucket_t *stream, + serf_bucket_alloc_t *allocator, + int format) +{ + deflate_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->stream = stream; + ctx->stream_status = APR_SUCCESS; + ctx->inflate_stream = serf_bucket_aggregate_create(allocator); + ctx->format = format; + ctx->crc = 0; + /* zstream must be NULL'd out. */ + memset(&ctx->zstream, 0, sizeof(ctx->zstream)); + + switch (ctx->format) { + case SERF_DEFLATE_GZIP: + ctx->state = STATE_READING_HEADER; + break; + case SERF_DEFLATE_DEFLATE: + /* deflate doesn't have a header. */ + ctx->state = STATE_INIT; + break; + default: + /* Not reachable */ + return NULL; + } + + /* Initial size of gzip header. */ + ctx->stream_left = ctx->stream_size = DEFLATE_MAGIC_SIZE; + + ctx->windowSize = DEFLATE_WINDOW_SIZE; + ctx->memLevel = DEFLATE_MEMLEVEL; + ctx->bufferSize = DEFLATE_BUFFER_SIZE; + + return serf_bucket_create(&serf_bucket_type_deflate, allocator, ctx); +} + +static void serf_deflate_destroy_and_data(serf_bucket_t *bucket) +{ + deflate_context_t *ctx = bucket->data; + + if (ctx->state > STATE_INIT && + ctx->state <= STATE_FINISH) + inflateEnd(&ctx->zstream); + + /* We may have appended inflate_stream into the stream bucket. + * If so, avoid free'ing it twice. + */ + if (ctx->inflate_stream) { + serf_bucket_destroy(ctx->inflate_stream); + } + serf_bucket_destroy(ctx->stream); + + serf_default_destroy_and_data(bucket); +} + +static apr_status_t serf_deflate_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + deflate_context_t *ctx = bucket->data; + unsigned long compCRC, compLen; + apr_status_t status; + const char *private_data; + apr_size_t private_len; + int zRC; + + while (1) { + switch (ctx->state) { + case STATE_READING_HEADER: + case STATE_READING_VERIFY: + status = serf_bucket_read(ctx->stream, ctx->stream_left, + &private_data, &private_len); + + if (SERF_BUCKET_READ_ERROR(status)) { + return status; + } + + memcpy(ctx->hdr_buffer + (ctx->stream_size - ctx->stream_left), + private_data, private_len); + + ctx->stream_left -= private_len; + + if (ctx->stream_left == 0) { + ctx->state++; + if (APR_STATUS_IS_EAGAIN(status)) { + *len = 0; + return status; + } + } + else if (status) { + *len = 0; + return status; + } + break; + case STATE_HEADER: + if (ctx->hdr_buffer[0] != deflate_magic[0] || + ctx->hdr_buffer[1] != deflate_magic[1]) { + return SERF_ERROR_DECOMPRESSION_FAILED; + } + if (ctx->hdr_buffer[3] != 0) { + return SERF_ERROR_DECOMPRESSION_FAILED; + } + ctx->state++; + break; + case STATE_VERIFY: + /* Do the checksum computation. */ + compCRC = getLong((unsigned char*)ctx->hdr_buffer); + if (ctx->crc != compCRC) { + return SERF_ERROR_DECOMPRESSION_FAILED; + } + compLen = getLong((unsigned char*)ctx->hdr_buffer + 4); + if (ctx->zstream.total_out != compLen) { + return SERF_ERROR_DECOMPRESSION_FAILED; + } + ctx->state++; + break; + case STATE_INIT: + zRC = inflateInit2(&ctx->zstream, ctx->windowSize); + if (zRC != Z_OK) { + return SERF_ERROR_DECOMPRESSION_FAILED; + } + ctx->zstream.next_out = ctx->buffer; + ctx->zstream.avail_out = ctx->bufferSize; + ctx->state++; + break; + case STATE_FINISH: + inflateEnd(&ctx->zstream); + serf_bucket_aggregate_prepend(ctx->stream, ctx->inflate_stream); + ctx->inflate_stream = 0; + ctx->state++; + break; + case STATE_INFLATE: + /* Do we have anything already uncompressed to read? */ + status = serf_bucket_read(ctx->inflate_stream, requested, data, + len); + if (SERF_BUCKET_READ_ERROR(status)) { + return status; + } + /* Hide EOF. */ + if (APR_STATUS_IS_EOF(status)) { + status = ctx->stream_status; + if (APR_STATUS_IS_EOF(status)) { + /* We've read all of the data from our stream, but we + * need to continue to iterate until we flush + * out the zlib buffer. + */ + status = APR_SUCCESS; + } + } + if (*len != 0) { + return status; + } + + /* We tried; but we have nothing buffered. Fetch more. */ + + /* It is possible that we maxed out avail_out before + * exhausting avail_in; therefore, continue using the + * previous buffer. Otherwise, fetch more data from + * our stream bucket. + */ + if (ctx->zstream.avail_in == 0) { + /* When we empty our inflated stream, we'll return this + * status - this allow us to eventually pass up EAGAINs. + */ + ctx->stream_status = serf_bucket_read(ctx->stream, + ctx->bufferSize, + &private_data, + &private_len); + + if (SERF_BUCKET_READ_ERROR(ctx->stream_status)) { + return ctx->stream_status; + } + + if (!private_len && APR_STATUS_IS_EAGAIN(ctx->stream_status)) { + *len = 0; + status = ctx->stream_status; + ctx->stream_status = APR_SUCCESS; + return status; + } + + ctx->zstream.next_in = (unsigned char*)private_data; + ctx->zstream.avail_in = private_len; + } + zRC = Z_OK; + while (ctx->zstream.avail_in != 0) { + /* We're full, clear out our buffer, reset, and return. */ + if (ctx->zstream.avail_out == 0) { + serf_bucket_t *tmp; + ctx->zstream.next_out = ctx->buffer; + private_len = ctx->bufferSize - ctx->zstream.avail_out; + + ctx->crc = crc32(ctx->crc, (const Bytef *)ctx->buffer, + private_len); + + /* FIXME: There probably needs to be a free func. */ + tmp = SERF_BUCKET_SIMPLE_STRING_LEN((char *)ctx->buffer, + private_len, + bucket->allocator); + serf_bucket_aggregate_append(ctx->inflate_stream, tmp); + ctx->zstream.avail_out = ctx->bufferSize; + break; + } + zRC = inflate(&ctx->zstream, Z_NO_FLUSH); + + if (zRC == Z_STREAM_END) { + serf_bucket_t *tmp; + + private_len = ctx->bufferSize - ctx->zstream.avail_out; + ctx->crc = crc32(ctx->crc, (const Bytef *)ctx->buffer, + private_len); + /* FIXME: There probably needs to be a free func. */ + tmp = SERF_BUCKET_SIMPLE_STRING_LEN((char *)ctx->buffer, + private_len, + bucket->allocator); + serf_bucket_aggregate_append(ctx->inflate_stream, tmp); + + ctx->zstream.avail_out = ctx->bufferSize; + + /* Push back the remaining data to be read. */ + tmp = serf_bucket_aggregate_create(bucket->allocator); + serf_bucket_aggregate_prepend(tmp, ctx->stream); + ctx->stream = tmp; + + /* We now need to take the remaining avail_in and + * throw it in ctx->stream so our next read picks it up. + */ + tmp = SERF_BUCKET_SIMPLE_STRING_LEN( + (const char*)ctx->zstream.next_in, + ctx->zstream.avail_in, + bucket->allocator); + serf_bucket_aggregate_prepend(ctx->stream, tmp); + + switch (ctx->format) { + case SERF_DEFLATE_GZIP: + ctx->stream_left = ctx->stream_size = + DEFLATE_VERIFY_SIZE; + ctx->state++; + break; + case SERF_DEFLATE_DEFLATE: + /* Deflate does not have a verify footer. */ + ctx->state = STATE_FINISH; + break; + default: + /* Not reachable */ + return APR_EGENERAL; + } + + break; + } + if (zRC != Z_OK) { + return SERF_ERROR_DECOMPRESSION_FAILED; + } + } + /* Okay, we've inflated. Try to read. */ + status = serf_bucket_read(ctx->inflate_stream, requested, data, + len); + /* Hide EOF. */ + if (APR_STATUS_IS_EOF(status)) { + status = ctx->stream_status; + /* If our stream is finished too, return SUCCESS so + * we'll iterate one more time. + */ + if (APR_STATUS_IS_EOF(status)) { + /* No more data to read from the stream, and everything + inflated. If all data was received correctly, state + should have been advanced to STATE_READING_VERIFY or + STATE_FINISH. If not, then the data was incomplete + and we have an error. */ + if (ctx->state != STATE_INFLATE) + return APR_SUCCESS; + else + return SERF_ERROR_DECOMPRESSION_FAILED; + } + } + return status; + case STATE_DONE: + /* We're done inflating. Use our finished buffer. */ + return serf_bucket_read(ctx->stream, requested, data, len); + default: + /* Not reachable */ + return APR_EGENERAL; + } + } + + /* NOTREACHED */ +} + +/* ### need to implement */ +#define serf_deflate_readline NULL +#define serf_deflate_peek NULL + +const serf_bucket_type_t serf_bucket_type_deflate = { + "DEFLATE", + serf_deflate_read, + serf_deflate_readline, + serf_default_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_deflate_peek, + serf_deflate_destroy_and_data, +}; diff --git a/buckets/file_buckets.c b/buckets/file_buckets.c new file mode 100644 index 0000000..bd41cab --- /dev/null +++ b/buckets/file_buckets.c @@ -0,0 +1,117 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "serf.h" +#include "serf_bucket_util.h" + +typedef struct { + apr_file_t *file; + + serf_databuf_t databuf; + +} file_context_t; + + +static apr_status_t file_reader(void *baton, apr_size_t bufsize, + char *buf, apr_size_t *len) +{ + file_context_t *ctx = baton; + + *len = bufsize; + return apr_file_read(ctx->file, buf, len); +} + +serf_bucket_t *serf_bucket_file_create( + apr_file_t *file, + serf_bucket_alloc_t *allocator) +{ + file_context_t *ctx; +#if APR_HAS_MMAP + apr_finfo_t finfo; + const char *file_path; + + /* See if we'd be better off mmap'ing this file instead. + * + * Note that there is a failure case here that we purposely fall through: + * if a file is buffered, apr_mmap will reject it. However, on older + * versions of APR, we have no way of knowing this - but apr_mmap_create + * will check for this and return APR_EBADF. + */ + apr_file_name_get(&file_path, file); + apr_stat(&finfo, file_path, APR_FINFO_SIZE, + serf_bucket_allocator_get_pool(allocator)); + if (APR_MMAP_CANDIDATE(finfo.size)) { + apr_status_t status; + apr_mmap_t *file_mmap; + status = apr_mmap_create(&file_mmap, file, 0, finfo.size, + APR_MMAP_READ, + serf_bucket_allocator_get_pool(allocator)); + + if (status == APR_SUCCESS) { + return serf_bucket_mmap_create(file_mmap, allocator); + } + } +#endif + + /* Oh, well. */ + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->file = file; + + serf_databuf_init(&ctx->databuf); + ctx->databuf.read = file_reader; + ctx->databuf.read_baton = ctx; + + return serf_bucket_create(&serf_bucket_type_file, allocator, ctx); +} + +static apr_status_t serf_file_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + file_context_t *ctx = bucket->data; + + return serf_databuf_read(&ctx->databuf, requested, data, len); +} + +static apr_status_t serf_file_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + file_context_t *ctx = bucket->data; + + return serf_databuf_readline(&ctx->databuf, acceptable, found, data, len); +} + +static apr_status_t serf_file_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + file_context_t *ctx = bucket->data; + + return serf_databuf_peek(&ctx->databuf, data, len); +} + +const serf_bucket_type_t serf_bucket_type_file = { + "FILE", + serf_file_read, + serf_file_readline, + serf_default_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_file_peek, + serf_default_destroy_and_data, +}; diff --git a/buckets/headers_buckets.c b/buckets/headers_buckets.c new file mode 100644 index 0000000..8bf91b4 --- /dev/null +++ b/buckets/headers_buckets.c @@ -0,0 +1,429 @@ +/* Copyright 2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include /* for strcasecmp() */ + +#include "serf.h" +#include "serf_bucket_util.h" + + +typedef struct header_list { + const char *header; + const char *value; + + apr_size_t header_size; + apr_size_t value_size; + + int alloc_flags; +#define ALLOC_HEADER 0x0001 /* header lives in our allocator */ +#define ALLOC_VALUE 0x0002 /* value lives in our allocator */ + + struct header_list *next; +} header_list_t; + +typedef struct { + header_list_t *list; + + header_list_t *cur_read; + enum { + READ_START, /* haven't started reading yet */ + READ_HEADER, /* reading cur_read->header */ + READ_SEP, /* reading ": " */ + READ_VALUE, /* reading cur_read->value */ + READ_CRLF, /* reading "\r\n" */ + READ_TERM, /* reading the final "\r\n" */ + READ_DONE /* no more data to read */ + } state; + apr_size_t amt_read; /* how much of the current state we've read */ + +} headers_context_t; + + +serf_bucket_t *serf_bucket_headers_create( + serf_bucket_alloc_t *allocator) +{ + headers_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->list = NULL; + ctx->state = READ_START; + + return serf_bucket_create(&serf_bucket_type_headers, allocator, ctx); +} + +void serf_bucket_headers_setx( + serf_bucket_t *bkt, + const char *header, apr_size_t header_size, int header_copy, + const char *value, apr_size_t value_size, int value_copy) +{ + headers_context_t *ctx = bkt->data; + header_list_t *iter = ctx->list; + header_list_t *hdr; + +#if 0 + /* ### include this? */ + if (ctx->cur_read) { + /* we started reading. can't change now. */ + abort(); + } +#endif + + hdr = serf_bucket_mem_alloc(bkt->allocator, sizeof(*hdr)); + hdr->header_size = header_size; + hdr->value_size = value_size; + hdr->alloc_flags = 0; + hdr->next = NULL; + + if (header_copy) { + hdr->header = serf_bstrmemdup(bkt->allocator, header, header_size); + hdr->alloc_flags |= ALLOC_HEADER; + } + else { + hdr->header = header; + } + + if (value_copy) { + hdr->value = serf_bstrmemdup(bkt->allocator, value, value_size); + hdr->alloc_flags |= ALLOC_VALUE; + } + else { + hdr->value = value; + } + + /* Add the new header at the end of the list. */ + while (iter && iter->next) { + iter = iter->next; + } + if (iter) + iter->next = hdr; + else + ctx->list = hdr; +} + +void serf_bucket_headers_set( + serf_bucket_t *headers_bucket, + const char *header, + const char *value) +{ + serf_bucket_headers_setx(headers_bucket, + header, strlen(header), 0, + value, strlen(value), 1); +} + +void serf_bucket_headers_setc( + serf_bucket_t *headers_bucket, + const char *header, + const char *value) +{ + serf_bucket_headers_setx(headers_bucket, + header, strlen(header), 1, + value, strlen(value), 1); +} + +void serf_bucket_headers_setn( + serf_bucket_t *headers_bucket, + const char *header, + const char *value) +{ + serf_bucket_headers_setx(headers_bucket, + header, strlen(header), 0, + value, strlen(value), 0); +} + +const char *serf_bucket_headers_get( + serf_bucket_t *headers_bucket, + const char *header) +{ + headers_context_t *ctx = headers_bucket->data; + header_list_t *found = ctx->list; + const char *val = NULL; + int value_size = 0; + int val_alloc = 0; + + while (found) { + if (strcasecmp(found->header, header) == 0) { + if (val) { + /* The header is already present. RFC 2616, section 4.2 + indicates that we should append the new value, separated by + a comma. Reasoning: for headers whose values are known to + be comma-separated, that is clearly the correct behavior; + for others, the correct behavior is undefined anyway. */ + + /* The "+1" is for the comma; serf_bstrmemdup() will also add + one slot for the terminating '\0'. */ + apr_size_t new_size = found->value_size + value_size + 1; + char *new_val = serf_bucket_mem_alloc(headers_bucket->allocator, + new_size); + memcpy(new_val, val, value_size); + new_val[value_size] = ','; + memcpy(new_val + value_size + 1, found->value, + found->value_size); + new_val[new_size] = '\0'; + /* Copy the new value over the already existing value. */ + if (val_alloc) + serf_bucket_mem_free(headers_bucket->allocator, (void*)val); + val_alloc |= ALLOC_VALUE; + val = new_val; + value_size = new_size; + } + else { + val = found->value; + value_size = found->value_size; + } + } + found = found->next; + } + + return val; +} + +void serf_bucket_headers_do( + serf_bucket_t *headers_bucket, + serf_bucket_headers_do_callback_fn_t func, + void *baton) +{ + headers_context_t *ctx = headers_bucket->data; + header_list_t *scan = ctx->list; + + while (scan) { + if (func(baton, scan->header, scan->value) != 0) { + break; + } + scan = scan->next; + } +} + +static void serf_headers_destroy_and_data(serf_bucket_t *bucket) +{ + headers_context_t *ctx = bucket->data; + header_list_t *scan = ctx->list; + + while (scan) { + header_list_t *next_hdr = scan->next; + + if (scan->alloc_flags & ALLOC_HEADER) + serf_bucket_mem_free(bucket->allocator, (void *)scan->header); + if (scan->alloc_flags & ALLOC_VALUE) + serf_bucket_mem_free(bucket->allocator, (void *)scan->value); + serf_bucket_mem_free(bucket->allocator, scan); + + scan = next_hdr; + } + + serf_default_destroy_and_data(bucket); +} + +static void select_value( + headers_context_t *ctx, + const char **value, + apr_size_t *len) +{ + const char *v; + apr_size_t l; + + if (ctx->state == READ_START) { + if (ctx->list == NULL) { + /* No headers. Move straight to the TERM state. */ + ctx->state = READ_TERM; + } + else { + ctx->state = READ_HEADER; + ctx->cur_read = ctx->list; + } + ctx->amt_read = 0; + } + + switch (ctx->state) { + case READ_HEADER: + v = ctx->cur_read->header; + l = ctx->cur_read->header_size; + break; + case READ_SEP: + v = ": "; + l = 2; + break; + case READ_VALUE: + v = ctx->cur_read->value; + l = ctx->cur_read->value_size; + break; + case READ_CRLF: + case READ_TERM: + v = "\r\n"; + l = 2; + break; + case READ_DONE: + *len = 0; + return; + default: + /* Not reachable */ + return; + } + + *value = v + ctx->amt_read; + *len = l - ctx->amt_read; +} + +/* the current data chunk has been read/consumed. move our internal state. */ +static apr_status_t consume_chunk(headers_context_t *ctx) +{ + /* move to the next state, resetting the amount read. */ + ++ctx->state; + ctx->amt_read = 0; + + /* just sent the terminator and moved to DONE. signal completion. */ + if (ctx->state == READ_DONE) + return APR_EOF; + + /* end of this header. move to the next one. */ + if (ctx->state == READ_TERM) { + ctx->cur_read = ctx->cur_read->next; + if (ctx->cur_read != NULL) { + /* We've got another head to send. Reset the read state. */ + ctx->state = READ_HEADER; + } + /* else leave in READ_TERM */ + } + + /* there is more data which can be read immediately. */ + return APR_SUCCESS; +} + +static apr_status_t serf_headers_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + headers_context_t *ctx = bucket->data; + + select_value(ctx, data, len); + + /* already done or returning the CRLF terminator? return EOF */ + if (ctx->state == READ_DONE || ctx->state == READ_TERM) + return APR_EOF; + + return APR_SUCCESS; +} + +static apr_status_t serf_headers_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + headers_context_t *ctx = bucket->data; + apr_size_t avail; + + select_value(ctx, data, &avail); + if (ctx->state == READ_DONE) + return APR_EOF; + + if (requested >= avail) { + /* return everything from this chunk */ + *len = avail; + + /* we consumed this chunk. advance the state. */ + return consume_chunk(ctx); + } + + /* return just the amount requested, and advance our pointer */ + *len = requested; + ctx->amt_read += requested; + + /* there is more that can be read immediately */ + return APR_SUCCESS; +} + +static apr_status_t serf_headers_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + headers_context_t *ctx = bucket->data; + apr_status_t status; + + /* ### what behavior should we use here? APR_EGENERAL for now */ + if ((acceptable & SERF_NEWLINE_CRLF) == 0) + return APR_EGENERAL; + + /* get whatever is in this chunk */ + select_value(ctx, data, len); + if (ctx->state == READ_DONE) + return APR_EOF; + + /* we consumed this chunk. advance the state. */ + status = consume_chunk(ctx); + + /* the type of newline found is easy... */ + *found = (ctx->state == READ_CRLF || ctx->state == READ_TERM) + ? SERF_NEWLINE_CRLF : SERF_NEWLINE_NONE; + + return status; +} + +static apr_status_t serf_headers_read_iovec(serf_bucket_t *bucket, + apr_size_t requested, + int vecs_size, + struct iovec *vecs, + int *vecs_used) +{ + apr_size_t avail = requested; + int i; + + *vecs_used = 0; + + for (i = 0; i < vecs_size; i++) { + const char *data; + apr_size_t len; + apr_status_t status; + + /* Calling read() would not be a safe opt in the general case, but it + * is here for the header bucket as it only frees all of the header + * keys and values when the entire bucket goes away - not on a + * per-read() basis as is normally the case. + */ + status = serf_headers_read(bucket, avail, &data, &len); + + if (len) { + vecs[*vecs_used].iov_base = (char*)data; + vecs[*vecs_used].iov_len = len; + + (*vecs_used)++; + + if (avail != SERF_READ_ALL_AVAIL) { + avail -= len; + + /* If we reach 0, then read()'s status will suffice. */ + if (avail == 0) { + return status; + } + } + } + + if (status) { + return status; + } + } + + return APR_SUCCESS; +} + +const serf_bucket_type_t serf_bucket_type_headers = { + "HEADERS", + serf_headers_read, + serf_headers_readline, + serf_headers_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_headers_peek, + serf_headers_destroy_and_data, +}; diff --git a/buckets/iovec_buckets.c b/buckets/iovec_buckets.c new file mode 100644 index 0000000..9ac1d8d --- /dev/null +++ b/buckets/iovec_buckets.c @@ -0,0 +1,169 @@ +/* Copyright 2011 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "serf.h" +#include "serf_bucket_util.h" + + +typedef struct { + struct iovec *vecs; + + /* Total number of buffer stored in the vecs var. */ + int vecs_len; + /* Points to the first unread buffer. */ + int current_vec; + /* First buffer offset. */ + int offset; +} iovec_context_t; + +serf_bucket_t *serf_bucket_iovec_create( + struct iovec vecs[], + int len, + serf_bucket_alloc_t *allocator) +{ + iovec_context_t *ctx; + int i; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->vecs = serf_bucket_mem_alloc(allocator, len * sizeof(struct iovec)); + ctx->vecs_len = len; + ctx->current_vec = 0; + ctx->offset = 0; + + /* copy all buffers to our iovec. */ + for (i = 0; i < len; i++) { + ctx->vecs[i].iov_base = vecs[i].iov_base; + ctx->vecs[i].iov_len = vecs[i].iov_len; + } + + return serf_bucket_create(&serf_bucket_type_iovec, allocator, ctx); +} + +static apr_status_t serf_iovec_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + return APR_ENOTIMPL; +} + +static apr_status_t serf_iovec_read_iovec(serf_bucket_t *bucket, + apr_size_t requested, + int vecs_size, + struct iovec *vecs, + int *vecs_used) +{ + iovec_context_t *ctx = bucket->data; + + *vecs_used = 0; + + /* copy the requested amount of buffers to the provided iovec. */ + for (; ctx->current_vec < ctx->vecs_len; ctx->current_vec++) { + struct iovec vec = ctx->vecs[ctx->current_vec]; + apr_size_t remaining; + + if (requested != SERF_READ_ALL_AVAIL && requested <= 0) + break; + if (*vecs_used >= vecs_size) + break; + + vecs[*vecs_used].iov_base = (char*)vec.iov_base + ctx->offset; + remaining = vec.iov_len - ctx->offset; + + /* Less bytes requested than remaining in the current buffer. */ + if (requested != SERF_READ_ALL_AVAIL && requested < remaining) { + vecs[*vecs_used].iov_len = requested; + ctx->offset += requested; + requested = 0; + (*vecs_used)++; + break; + } else { + /* Copy the complete buffer. */ + vecs[*vecs_used].iov_len = remaining; + ctx->offset = 0; + if (requested != SERF_READ_ALL_AVAIL) + requested -= remaining; + (*vecs_used)++; + } + } + + if (ctx->current_vec == ctx->vecs_len && !ctx->offset) + return APR_EOF; + + return APR_SUCCESS; +} + +static apr_status_t serf_iovec_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + struct iovec vec[1]; + apr_status_t status; + int vecs_used; + + status = serf_iovec_read_iovec(bucket, requested, 1, vec, &vecs_used); + + if (vecs_used) { + *data = vec[0].iov_base; + *len = vec[0].iov_len; + } else { + *len = 0; + } + + return status; +} + +static apr_status_t serf_iovec_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + iovec_context_t *ctx = bucket->data; + + if (ctx->current_vec >= ctx->vecs_len) { + *len = 0; + return APR_EOF; + } + + /* Return the first unread buffer, don't bother combining all + remaining data. */ + *data = ctx->vecs[ctx->current_vec].iov_base; + *len = ctx->vecs[ctx->current_vec].iov_len; + + if (ctx->current_vec + 1 == ctx->vecs_len) + return APR_EOF; + + return APR_SUCCESS; +} + +static void serf_iovec_destroy(serf_bucket_t *bucket) +{ + iovec_context_t *ctx = bucket->data; + + serf_bucket_mem_free(bucket->allocator, ctx->vecs); + serf_default_destroy_and_data(bucket); +} + + +const serf_bucket_type_t serf_bucket_type_iovec = { + "IOVEC", + serf_iovec_read, + serf_iovec_readline, + serf_iovec_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_iovec_peek, + serf_iovec_destroy, +}; diff --git a/buckets/limit_buckets.c b/buckets/limit_buckets.c new file mode 100644 index 0000000..d2e6166 --- /dev/null +++ b/buckets/limit_buckets.c @@ -0,0 +1,134 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "serf.h" +#include "serf_bucket_util.h" + +/* Older versions of APR do not have this macro. */ +#ifdef APR_SIZE_MAX +#define REQUESTED_MAX APR_SIZE_MAX +#else +#define REQUESTED_MAX (~((apr_size_t)0)) +#endif + + +typedef struct { + serf_bucket_t *stream; + apr_uint64_t remaining; +} limit_context_t; + + +serf_bucket_t *serf_bucket_limit_create( + serf_bucket_t *stream, apr_uint64_t len, serf_bucket_alloc_t *allocator) +{ + limit_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->stream = stream; + ctx->remaining = len; + + return serf_bucket_create(&serf_bucket_type_limit, allocator, ctx); +} + +static apr_status_t serf_limit_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + limit_context_t *ctx = bucket->data; + apr_status_t status; + + if (!ctx->remaining) { + *len = 0; + return APR_EOF; + } + + if (requested == SERF_READ_ALL_AVAIL || requested > ctx->remaining) { + if (ctx->remaining <= REQUESTED_MAX) { + requested = (apr_size_t) ctx->remaining; + } else { + requested = REQUESTED_MAX; + } + } + + status = serf_bucket_read(ctx->stream, requested, data, len); + + if (!SERF_BUCKET_READ_ERROR(status)) { + ctx->remaining -= *len; + } + + /* If we have met our limit and don't have a status, return EOF. */ + if (!ctx->remaining && !status) { + status = APR_EOF; + } + + return status; +} + +static apr_status_t serf_limit_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + limit_context_t *ctx = bucket->data; + apr_status_t status; + + if (!ctx->remaining) { + *len = 0; + return APR_EOF; + } + + status = serf_bucket_readline(ctx->stream, acceptable, found, data, len); + + if (!SERF_BUCKET_READ_ERROR(status)) { + ctx->remaining -= *len; + } + + /* If we have met our limit and don't have a status, return EOF. */ + if (!ctx->remaining && !status) { + status = APR_EOF; + } + + return status; +} + +static apr_status_t serf_limit_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + limit_context_t *ctx = bucket->data; + + return serf_bucket_peek(ctx->stream, data, len); +} + +static void serf_limit_destroy(serf_bucket_t *bucket) +{ + limit_context_t *ctx = bucket->data; + + serf_bucket_destroy(ctx->stream); + + serf_default_destroy_and_data(bucket); +} + +const serf_bucket_type_t serf_bucket_type_limit = { + "LIMIT", + serf_limit_read, + serf_limit_readline, + serf_default_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_limit_peek, + serf_limit_destroy, +}; diff --git a/buckets/mmap_buckets.c b/buckets/mmap_buckets.c new file mode 100644 index 0000000..f55a76b --- /dev/null +++ b/buckets/mmap_buckets.c @@ -0,0 +1,118 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "serf.h" +#include "serf_bucket_util.h" + + +typedef struct { + apr_mmap_t *mmap; + void *current; + apr_off_t offset; + apr_off_t remaining; +} mmap_context_t; + + +serf_bucket_t *serf_bucket_mmap_create( + apr_mmap_t *file_mmap, + serf_bucket_alloc_t *allocator) +{ + mmap_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->mmap = file_mmap; + ctx->current = NULL; + ctx->offset = 0; + ctx->remaining = ctx->mmap->size; + + return serf_bucket_create(&serf_bucket_type_mmap, allocator, ctx); +} + +static apr_status_t serf_mmap_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + mmap_context_t *ctx = bucket->data; + + if (requested == SERF_READ_ALL_AVAIL || requested > ctx->remaining) { + *len = ctx->remaining; + } + else { + *len = requested; + } + + /* ### Would it be faster to call this once and do the offset ourselves? */ + apr_mmap_offset((void**)data, ctx->mmap, ctx->offset); + + /* For the next read... */ + ctx->offset += *len; + ctx->remaining -= *len; + + if (ctx->remaining == 0) { + return APR_EOF; + } + return APR_SUCCESS; +} + +static apr_status_t serf_mmap_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + mmap_context_t *ctx = bucket->data; + const char *end; + + /* ### Would it be faster to call this once and do the offset ourselves? */ + apr_mmap_offset((void**)data, ctx->mmap, ctx->offset); + end = *data; + + /* XXX An overflow is generated if we pass &ctx->remaining to readline. + * Not real clear why. + */ + *len = ctx->remaining; + + serf_util_readline(&end, len, acceptable, found); + + *len = end - *data; + + ctx->offset += *len; + ctx->remaining -= *len; + + if (ctx->remaining == 0) { + return APR_EOF; + } + return APR_SUCCESS; +} + +static apr_status_t serf_mmap_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + /* Oh, bah. */ + return APR_ENOTIMPL; +} + +const serf_bucket_type_t serf_bucket_type_mmap = { + "MMAP", + serf_mmap_read, + serf_mmap_readline, + serf_default_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_mmap_peek, + serf_default_destroy_and_data, +}; diff --git a/buckets/request_buckets.c b/buckets/request_buckets.c new file mode 100644 index 0000000..be010c0 --- /dev/null +++ b/buckets/request_buckets.c @@ -0,0 +1,228 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "serf.h" +#include "serf_bucket_util.h" + + +typedef struct { + const char *method; + const char *uri; + serf_bucket_t *headers; + serf_bucket_t *body; + apr_int64_t len; +} request_context_t; + +#define LENGTH_UNKNOWN ((apr_int64_t)-1) + + +serf_bucket_t *serf_bucket_request_create( + const char *method, + const char *URI, + serf_bucket_t *body, + serf_bucket_alloc_t *allocator) +{ + request_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->method = method; + ctx->uri = URI; + ctx->headers = serf_bucket_headers_create(allocator); + ctx->body = body; + ctx->len = LENGTH_UNKNOWN; + + return serf_bucket_create(&serf_bucket_type_request, allocator, ctx); +} + +void serf_bucket_request_set_CL( + serf_bucket_t *bucket, + apr_int64_t len) +{ + request_context_t *ctx = (request_context_t *)bucket->data; + + ctx->len = len; +} + +serf_bucket_t *serf_bucket_request_get_headers( + serf_bucket_t *bucket) +{ + return ((request_context_t *)bucket->data)->headers; +} + +void serf_bucket_request_set_root( + serf_bucket_t *bucket, + const char *root_url) +{ + request_context_t *ctx = (request_context_t *)bucket->data; + + /* If uri is already absolute, don't change it. */ + if (ctx->uri[0] != '/') + return; + + /* If uri is '/' replace it with root_url. */ + if (ctx->uri[1] == '\0') + ctx->uri = root_url; + else + ctx->uri = + apr_pstrcat(serf_bucket_allocator_get_pool(bucket->allocator), + root_url, + ctx->uri, + NULL); +} + +static void serialize_data(serf_bucket_t *bucket) +{ + request_context_t *ctx = bucket->data; + serf_bucket_t *new_bucket; + const char *new_data; + struct iovec iov[4]; + apr_size_t nbytes; + + /* Serialize the request-line and headers into one mother string, + * and wrap a bucket around it. + */ + iov[0].iov_base = (char*)ctx->method; + iov[0].iov_len = strlen(ctx->method); + iov[1].iov_base = " "; + iov[1].iov_len = sizeof(" ") - 1; + iov[2].iov_base = (char*)ctx->uri; + iov[2].iov_len = strlen(ctx->uri); + iov[3].iov_base = " HTTP/1.1\r\n"; + iov[3].iov_len = sizeof(" HTTP/1.1\r\n") - 1; + + /* ### pool allocation! */ + new_data = apr_pstrcatv(serf_bucket_allocator_get_pool(bucket->allocator), + iov, 4, &nbytes); + + /* Create a new bucket for this string. A free function isn't needed + * since the string is residing in a pool. + */ + new_bucket = SERF_BUCKET_SIMPLE_STRING_LEN(new_data, nbytes, + bucket->allocator); + + /* Build up the new bucket structure. + * + * Note that self needs to become an aggregate bucket so that a + * pointer to self still represents the "right" data. + */ + serf_bucket_aggregate_become(bucket); + + /* Insert the two buckets. */ + serf_bucket_aggregate_append(bucket, new_bucket); + serf_bucket_aggregate_append(bucket, ctx->headers); + + /* If we know the length, then use C-L and the raw body. Otherwise, + use chunked encoding for the request. */ + if (ctx->len != LENGTH_UNKNOWN) { + char buf[30]; + sprintf(buf, "%" APR_INT64_T_FMT, ctx->len); + serf_bucket_headers_set(ctx->headers, "Content-Length", buf); + if (ctx->body != NULL) + serf_bucket_aggregate_append(bucket, ctx->body); + } + else if (ctx->body != NULL) { + /* Morph the body bucket to a chunked encoding bucket for now. */ + serf_bucket_headers_setn(ctx->headers, "Transfer-Encoding", "chunked"); + ctx->body = serf_bucket_chunk_create(ctx->body, bucket->allocator); + serf_bucket_aggregate_append(bucket, ctx->body); + } + + /* Our private context is no longer needed, and is not referred to by + * any existing bucket. Toss it. + */ + serf_bucket_mem_free(bucket->allocator, ctx); +} + +static apr_status_t serf_request_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + /* Seralize our private data into a new aggregate bucket. */ + serialize_data(bucket); + + /* Delegate to the "new" aggregate bucket to do the read. */ + return serf_bucket_read(bucket, requested, data, len); +} + +static apr_status_t serf_request_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + /* Seralize our private data into a new aggregate bucket. */ + serialize_data(bucket); + + /* Delegate to the "new" aggregate bucket to do the readline. */ + return serf_bucket_readline(bucket, acceptable, found, data, len); +} + +static apr_status_t serf_request_read_iovec(serf_bucket_t *bucket, + apr_size_t requested, + int vecs_size, + struct iovec *vecs, + int *vecs_used) +{ + /* Seralize our private data into a new aggregate bucket. */ + serialize_data(bucket); + + /* Delegate to the "new" aggregate bucket to do the read. */ + return serf_bucket_read_iovec(bucket, requested, + vecs_size, vecs, vecs_used); +} + +static apr_status_t serf_request_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + /* Seralize our private data into a new aggregate bucket. */ + serialize_data(bucket); + + /* Delegate to the "new" aggregate bucket to do the peek. */ + return serf_bucket_peek(bucket, data, len); +} + +void serf_bucket_request_become( + serf_bucket_t *bucket, + const char *method, + const char *uri, + serf_bucket_t *body) +{ + request_context_t *ctx; + + ctx = serf_bucket_mem_alloc(bucket->allocator, sizeof(*ctx)); + ctx->method = method; + ctx->uri = uri; + ctx->headers = serf_bucket_headers_create(bucket->allocator); + ctx->body = body; + + bucket->type = &serf_bucket_type_request; + bucket->data = ctx; + + /* The allocator remains the same. */ +} + +const serf_bucket_type_t serf_bucket_type_request = { + "REQUEST", + serf_request_read, + serf_request_readline, + serf_request_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_request_peek, + serf_default_destroy_and_data, +}; + diff --git a/buckets/response_buckets.c b/buckets/response_buckets.c new file mode 100644 index 0000000..975fedc --- /dev/null +++ b/buckets/response_buckets.c @@ -0,0 +1,429 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "serf.h" +#include "serf_bucket_util.h" + + +typedef struct { + serf_bucket_t *stream; + serf_bucket_t *body; /* Pointer to the stream wrapping the body. */ + serf_bucket_t *headers; /* holds parsed headers */ + + enum { + STATE_STATUS_LINE, /* reading status line */ + STATE_HEADERS, /* reading headers */ + STATE_BODY, /* reading body */ + STATE_TRAILERS, /* reading trailers */ + STATE_DONE /* we've sent EOF */ + } state; + + /* Buffer for accumulating a line from the response. */ + serf_linebuf_t linebuf; + + serf_status_line sl; + + int chunked; /* Do we need to read trailers? */ + int head_req; /* Was this a HEAD request? */ +} response_context_t; + + +serf_bucket_t *serf_bucket_response_create( + serf_bucket_t *stream, + serf_bucket_alloc_t *allocator) +{ + response_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->stream = stream; + ctx->body = NULL; + ctx->headers = serf_bucket_headers_create(allocator); + ctx->state = STATE_STATUS_LINE; + ctx->chunked = 0; + ctx->head_req = 0; + + serf_linebuf_init(&ctx->linebuf); + + return serf_bucket_create(&serf_bucket_type_response, allocator, ctx); +} + +void serf_bucket_response_set_head( + serf_bucket_t *bucket) +{ + response_context_t *ctx = bucket->data; + + ctx->head_req = 1; +} + +serf_bucket_t *serf_bucket_response_get_headers( + serf_bucket_t *bucket) +{ + return ((response_context_t *)bucket->data)->headers; +} + + +static void serf_response_destroy_and_data(serf_bucket_t *bucket) +{ + response_context_t *ctx = bucket->data; + + if (ctx->state != STATE_STATUS_LINE) { + serf_bucket_mem_free(bucket->allocator, (void*)ctx->sl.reason); + } + + serf_bucket_destroy(ctx->stream); + if (ctx->body != NULL) + serf_bucket_destroy(ctx->body); + serf_bucket_destroy(ctx->headers); + + serf_default_destroy_and_data(bucket); +} + +static apr_status_t fetch_line(response_context_t *ctx, int acceptable) +{ + return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable); +} + +static apr_status_t parse_status_line(response_context_t *ctx, + serf_bucket_alloc_t *allocator) +{ + int res; + char *reason; /* ### stupid APR interface makes this non-const */ + + /* ctx->linebuf.line should be of form: HTTP/1.1 200 OK */ + res = apr_date_checkmask(ctx->linebuf.line, "HTTP/#.# ###*"); + if (!res) { + /* Not an HTTP response? Well, at least we won't understand it. */ + return SERF_ERROR_BAD_HTTP_RESPONSE; + } + + ctx->sl.version = SERF_HTTP_VERSION(ctx->linebuf.line[5] - '0', + ctx->linebuf.line[7] - '0'); + ctx->sl.code = apr_strtoi64(ctx->linebuf.line + 8, &reason, 10); + + /* Skip leading spaces for the reason string. */ + if (apr_isspace(*reason)) { + reason++; + } + + /* Copy the reason value out of the line buffer. */ + ctx->sl.reason = serf_bstrmemdup(allocator, reason, + ctx->linebuf.used + - (reason - ctx->linebuf.line)); + + return APR_SUCCESS; +} + +/* This code should be replaced with header buckets. */ +static apr_status_t fetch_headers(serf_bucket_t *bkt, response_context_t *ctx) +{ + apr_status_t status; + + /* RFC 2616 says that CRLF is the only line ending, but we can easily + * accept any kind of line ending. + */ + status = fetch_line(ctx, SERF_NEWLINE_ANY); + if (SERF_BUCKET_READ_ERROR(status)) { + return status; + } + /* Something was read. Process it. */ + + if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) { + const char *end_key; + const char *c; + + end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used); + if (!c) { + /* Bad headers? */ + return SERF_ERROR_BAD_HTTP_RESPONSE; + } + + /* Skip over initial ':' */ + c++; + + /* And skip all whitespaces. */ + for(; c < ctx->linebuf.line + ctx->linebuf.used; c++) + { + if (!apr_isspace(*c)) + { + break; + } + } + + /* Always copy the headers (from the linebuf into new mem). */ + /* ### we should be able to optimize some mem copies */ + serf_bucket_headers_setx( + ctx->headers, + ctx->linebuf.line, end_key - ctx->linebuf.line, 1, + c, ctx->linebuf.line + ctx->linebuf.used - c, 1); + } + + return status; +} + +/* Perform one iteration of the state machine. + * + * Will return when one the following conditions occurred: + * 1) a state change + * 2) an error + * 3) the stream is not ready or at EOF + * 4) APR_SUCCESS, meaning the machine can be run again immediately + */ +static apr_status_t run_machine(serf_bucket_t *bkt, response_context_t *ctx) +{ + apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */ + + switch (ctx->state) { + case STATE_STATUS_LINE: + /* RFC 2616 says that CRLF is the only line ending, but we can easily + * accept any kind of line ending. + */ + status = fetch_line(ctx, SERF_NEWLINE_ANY); + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + if (ctx->linebuf.state == SERF_LINEBUF_READY) { + /* The Status-Line is in the line buffer. Process it. */ + status = parse_status_line(ctx, bkt->allocator); + if (status) + return status; + + /* Good times ahead: we're switching protocols! */ + if (ctx->sl.code == 101) { + ctx->body = + serf_bucket_barrier_create(ctx->stream, bkt->allocator); + ctx->state = STATE_DONE; + break; + } + + /* Okay... move on to reading the headers. */ + ctx->state = STATE_HEADERS; + } + else { + /* The connection closed before we could get the next + * response. Treat the request as lost so that our upper + * end knows the server never tried to give us a response. + */ + if (APR_STATUS_IS_EOF(status)) { + return SERF_ERROR_REQUEST_LOST; + } + } + break; + case STATE_HEADERS: + status = fetch_headers(bkt, ctx); + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + /* If an empty line was read, then we hit the end of the headers. + * Move on to the body. + */ + if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) { + const void *v; + + /* Advance the state. */ + ctx->state = STATE_BODY; + + ctx->body = + serf_bucket_barrier_create(ctx->stream, bkt->allocator); + + /* Are we C-L, chunked, or conn close? */ + v = serf_bucket_headers_get(ctx->headers, "Content-Length"); + if (v) { + apr_uint64_t length; + length = apr_strtoi64(v, NULL, 10); + if (errno == ERANGE) { + return APR_FROM_OS_ERROR(ERANGE); + } + ctx->body = serf_bucket_limit_create(ctx->body, length, + bkt->allocator); + } + else { + v = serf_bucket_headers_get(ctx->headers, "Transfer-Encoding"); + + /* Need to handle multiple transfer-encoding. */ + if (v && strcasecmp("chunked", v) == 0) { + ctx->chunked = 1; + ctx->body = serf_bucket_dechunk_create(ctx->body, + bkt->allocator); + } + + if (!v && (ctx->sl.code == 204 || ctx->sl.code == 304)) { + ctx->state = STATE_DONE; + } + } + v = serf_bucket_headers_get(ctx->headers, "Content-Encoding"); + if (v) { + /* Need to handle multiple content-encoding. */ + if (v && strcasecmp("gzip", v) == 0) { + ctx->body = + serf_bucket_deflate_create(ctx->body, bkt->allocator, + SERF_DEFLATE_GZIP); + } + else if (v && strcasecmp("deflate", v) == 0) { + ctx->body = + serf_bucket_deflate_create(ctx->body, bkt->allocator, + SERF_DEFLATE_DEFLATE); + } + } + /* If we're a HEAD request, we don't receive a body. */ + if (ctx->head_req) { + ctx->state = STATE_DONE; + } + } + break; + case STATE_BODY: + /* Don't do anything. */ + break; + case STATE_TRAILERS: + status = fetch_headers(bkt, ctx); + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + /* If an empty line was read, then we're done. */ + if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) { + ctx->state = STATE_DONE; + return APR_EOF; + } + break; + case STATE_DONE: + return APR_EOF; + default: + /* Not reachable */ + return APR_EGENERAL; + } + + return status; +} + +static apr_status_t wait_for_body(serf_bucket_t *bkt, response_context_t *ctx) +{ + apr_status_t status; + + /* Keep reading and moving through states if we aren't at the BODY */ + while (ctx->state != STATE_BODY) { + status = run_machine(bkt, ctx); + + /* Anything other than APR_SUCCESS means that we cannot immediately + * read again (for now). + */ + if (status) + return status; + } + /* in STATE_BODY */ + + return APR_SUCCESS; +} + +apr_status_t serf_bucket_response_wait_for_headers( + serf_bucket_t *bucket) +{ + response_context_t *ctx = bucket->data; + + return wait_for_body(bucket, ctx); +} + +apr_status_t serf_bucket_response_status( + serf_bucket_t *bkt, + serf_status_line *sline) +{ + response_context_t *ctx = bkt->data; + apr_status_t status; + + if (ctx->state != STATE_STATUS_LINE) { + /* We already read it and moved on. Just return it. */ + *sline = ctx->sl; + return APR_SUCCESS; + } + + /* Running the state machine once will advance the machine, or state + * that the stream isn't ready with enough data. There isn't ever a + * need to run the machine more than once to try and satisfy this. We + * have to look at the state to tell whether it advanced, though, as + * it is quite possible to advance *and* to return APR_EAGAIN. + */ + status = run_machine(bkt, ctx); + if (ctx->state == STATE_HEADERS) { + *sline = ctx->sl; + } + else { + /* Indicate that we don't have the information yet. */ + sline->version = 0; + } + + return status; +} + +static apr_status_t serf_response_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + response_context_t *ctx = bucket->data; + apr_status_t rv; + + rv = wait_for_body(bucket, ctx); + if (rv) { + /* It's not possible to have read anything yet! */ + if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) { + *len = 0; + } + return rv; + } + + rv = serf_bucket_read(ctx->body, requested, data, len); + if (APR_STATUS_IS_EOF(rv)) { + if (ctx->chunked) { + ctx->state = STATE_TRAILERS; + /* Mask the result. */ + rv = APR_SUCCESS; + } + else { + ctx->state = STATE_DONE; + } + } + return rv; +} + +static apr_status_t serf_response_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + response_context_t *ctx = bucket->data; + apr_status_t rv; + + rv = wait_for_body(bucket, ctx); + if (rv) { + return rv; + } + + /* Delegate to the stream bucket to do the readline. */ + return serf_bucket_readline(ctx->body, acceptable, found, data, len); +} + +/* ### need to implement */ +#define serf_response_peek NULL + +const serf_bucket_type_t serf_bucket_type_response = { + "RESPONSE", + serf_response_read, + serf_response_readline, + serf_default_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_response_peek, + serf_response_destroy_and_data, +}; diff --git a/buckets/simple_buckets.c b/buckets/simple_buckets.c new file mode 100644 index 0000000..f36239b --- /dev/null +++ b/buckets/simple_buckets.c @@ -0,0 +1,142 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "serf.h" +#include "serf_bucket_util.h" + + +typedef struct { + const char *original; + const char *current; + apr_size_t remaining; + + serf_simple_freefunc_t freefunc; + void *baton; + +} simple_context_t; + + +static void free_copied_data(void *baton, const char *data) +{ + serf_bucket_mem_free(baton, (char*)data); +} + +serf_bucket_t *serf_bucket_simple_create( + const char *data, + apr_size_t len, + serf_simple_freefunc_t freefunc, + void *freefunc_baton, + serf_bucket_alloc_t *allocator) +{ + simple_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->original = ctx->current = data; + ctx->remaining = len; + ctx->freefunc = freefunc; + ctx->baton = freefunc_baton; + + return serf_bucket_create(&serf_bucket_type_simple, allocator, ctx); +} + +serf_bucket_t *serf_bucket_simple_copy_create( + const char *data, apr_size_t len, + serf_bucket_alloc_t *allocator) +{ + simple_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + + ctx->original = ctx->current = serf_bucket_mem_alloc(allocator, len); + memcpy((char*)ctx->original, data, len); + + ctx->remaining = len; + ctx->freefunc = free_copied_data; + ctx->baton = allocator; + + return serf_bucket_create(&serf_bucket_type_simple, allocator, ctx); +} + +static apr_status_t serf_simple_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + simple_context_t *ctx = bucket->data; + + if (requested == SERF_READ_ALL_AVAIL || requested > ctx->remaining) + requested = ctx->remaining; + + *data = ctx->current; + *len = requested; + + ctx->current += requested; + ctx->remaining -= requested; + + return ctx->remaining ? APR_SUCCESS : APR_EOF; +} + +static apr_status_t serf_simple_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + simple_context_t *ctx = bucket->data; + + /* Returned data will be from current position. */ + *data = ctx->current; + serf_util_readline(&ctx->current, &ctx->remaining, acceptable, found); + + /* See how much ctx->current moved forward. */ + *len = ctx->current - *data; + + return ctx->remaining ? APR_SUCCESS : APR_EOF; +} + +static apr_status_t serf_simple_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + simple_context_t *ctx = bucket->data; + + /* return whatever we have left */ + *data = ctx->current; + *len = ctx->remaining; + + /* we returned everything this bucket will ever hold */ + return APR_EOF; +} + +static void serf_simple_destroy(serf_bucket_t *bucket) +{ + simple_context_t *ctx = bucket->data; + + if (ctx->freefunc) + (*ctx->freefunc)(ctx->baton, ctx->original); + + serf_default_destroy_and_data(bucket); +} + + +const serf_bucket_type_t serf_bucket_type_simple = { + "SIMPLE", + serf_simple_read, + serf_simple_readline, + serf_default_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_simple_peek, + serf_simple_destroy, +}; diff --git a/buckets/socket_buckets.c b/buckets/socket_buckets.c new file mode 100644 index 0000000..dd2469a --- /dev/null +++ b/buckets/socket_buckets.c @@ -0,0 +1,114 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "serf.h" +#include "serf_bucket_util.h" + + +typedef struct { + apr_socket_t *skt; + + serf_databuf_t databuf; + + /* Progress callback */ + serf_progress_t progress_func; + void *progress_baton; +} socket_context_t; + + +static apr_status_t socket_reader(void *baton, apr_size_t bufsize, + char *buf, apr_size_t *len) +{ + socket_context_t *ctx = baton; + apr_status_t status; + + *len = bufsize; + status = apr_socket_recv(ctx->skt, buf, len); + + if (ctx->progress_func) + ctx->progress_func(ctx->progress_baton, *len, 0); + + return status; +} + +serf_bucket_t *serf_bucket_socket_create( + apr_socket_t *skt, + serf_bucket_alloc_t *allocator) +{ + socket_context_t *ctx; + + /* Oh, well. */ + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->skt = skt; + + serf_databuf_init(&ctx->databuf); + ctx->databuf.read = socket_reader; + ctx->databuf.read_baton = ctx; + + ctx->progress_func = ctx->progress_baton = NULL; + return serf_bucket_create(&serf_bucket_type_socket, allocator, ctx); +} + +void serf_bucket_socket_set_read_progress_cb( + serf_bucket_t *bucket, + const serf_progress_t progress_func, + void *progress_baton) +{ + socket_context_t *ctx = bucket->data; + + ctx->progress_func = progress_func; + ctx->progress_baton = progress_baton; +} + +static apr_status_t serf_socket_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + socket_context_t *ctx = bucket->data; + + return serf_databuf_read(&ctx->databuf, requested, data, len); +} + +static apr_status_t serf_socket_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + socket_context_t *ctx = bucket->data; + + return serf_databuf_readline(&ctx->databuf, acceptable, found, data, len); +} + +static apr_status_t serf_socket_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + socket_context_t *ctx = bucket->data; + + return serf_databuf_peek(&ctx->databuf, data, len); +} + +const serf_bucket_type_t serf_bucket_type_socket = { + "SOCKET", + serf_socket_read, + serf_socket_readline, + serf_default_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_socket_peek, + serf_default_destroy_and_data, +}; diff --git a/buckets/ssl_buckets.c b/buckets/ssl_buckets.c new file mode 100644 index 0000000..3f43543 --- /dev/null +++ b/buckets/ssl_buckets.c @@ -0,0 +1,1629 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ---- + * + * For the OpenSSL thread-safety locking code: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Originally developed by Aaron Bannert and Justin Erenkrantz, eBuilt. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "serf.h" +#include "serf_bucket_util.h" + +#include +#include +#include +#include +#include + +#ifndef APR_VERSION_AT_LEAST /* Introduced in APR 1.3.0 */ +#define APR_VERSION_AT_LEAST(major,minor,patch) \ + (((major) < APR_MAJOR_VERSION) \ + || ((major) == APR_MAJOR_VERSION && (minor) < APR_MINOR_VERSION) \ + || ((major) == APR_MAJOR_VERSION && (minor) == APR_MINOR_VERSION && \ + (patch) <= APR_PATCH_VERSION)) +#endif /* APR_VERSION_AT_LEAST */ + +#ifndef APR_ARRAY_PUSH +#define APR_ARRAY_PUSH(ary,type) (*((type *)apr_array_push(ary))) +#endif + + +/*#define SSL_VERBOSE*/ + +/* + * Here's an overview of the SSL bucket's relationship to OpenSSL and serf. + * + * HTTP request: SSLENCRYPT(REQUEST) + * [context.c reads from SSLENCRYPT and writes out to the socket] + * HTTP response: RESPONSE(SSLDECRYPT(SOCKET)) + * [handler function reads from RESPONSE which in turn reads from SSLDECRYPT] + * + * HTTP request read call path: + * + * write_to_connection + * |- serf_bucket_read on SSLENCRYPT + * |- serf_ssl_read + * |- serf_databuf_read + * |- common_databuf_prep + * |- ssl_encrypt + * |- 1. Try to read pending encrypted data; If available, return. + * |- 2. Try to read from ctx->stream [REQUEST bucket] + * |- 3. Call SSL_write with read data + * |- ... + * |- bio_bucket_read can be called + * |- bio_bucket_write with encrypted data + * |- store in sink + * |- 4. If successful, read pending encrypted data and return. + * |- 5. If fails, place read data back in ctx->stream + * + * HTTP response read call path: + * + * read_from_connection + * |- acceptor + * |- handler + * |- ... + * |- serf_bucket_read(SSLDECRYPT) + * |- serf_ssl_read + * |- serf_databuf_read + * |- ssl_decrypt + * |- 1. SSL_read() for pending decrypted data; if any, return. + * |- 2. Try to read from ctx->stream [SOCKET bucket] + * |- 3. Append data to ssl_ctx->source + * |- 4. Call SSL_read() + * |- ... + * |- bio_bucket_write can be called + * |- bio_bucket_read + * |- read data from ssl_ctx->source + * |- If data read, return it. + * |- If an error, set the STATUS value and return. + * + */ + +typedef struct bucket_list { + serf_bucket_t *bucket; + struct bucket_list *next; +} bucket_list_t; + +typedef struct { + /* Helper to read data. Wraps stream. */ + serf_databuf_t databuf; + + /* Our source for more data. */ + serf_bucket_t *stream; + + /* The next set of buckets */ + bucket_list_t *stream_next; + + /* The status of the last thing we read. */ + apr_status_t status; + apr_status_t exhausted; + int exhausted_reset; + + /* Data we've read but not processed. */ + serf_bucket_t *pending; +} serf_ssl_stream_t; + +struct serf_ssl_context_t { + /* How many open buckets refer to this context. */ + int refcount; + + /* The pool that this context uses. */ + apr_pool_t *pool; + + /* The allocator associated with the above pool. */ + serf_bucket_alloc_t *allocator; + + /* Internal OpenSSL parameters */ + SSL_CTX *ctx; + SSL *ssl; + BIO *bio; + + serf_ssl_stream_t encrypt; + serf_ssl_stream_t decrypt; + + /* Client cert callbacks */ + serf_ssl_need_client_cert_t cert_callback; + void *cert_userdata; + apr_pool_t *cert_cache_pool; + const char *cert_file_success; + + /* Client cert PW callbacks */ + serf_ssl_need_cert_password_t cert_pw_callback; + void *cert_pw_userdata; + apr_pool_t *cert_pw_cache_pool; + const char *cert_pw_success; + + /* Server cert callbacks */ + serf_ssl_need_server_cert_t server_cert_callback; + serf_ssl_server_cert_chain_cb_t server_cert_chain_callback; + void *server_cert_userdata; + + const char *cert_path; + + X509 *cached_cert; + EVP_PKEY *cached_cert_pw; + + apr_status_t pending_err; +}; + +typedef struct { + /* The bucket-independent ssl context that this bucket is associated with */ + serf_ssl_context_t *ssl_ctx; + + /* Pointer to the 'right' databuf. */ + serf_databuf_t *databuf; + + /* Pointer to our stream, so we can find it later. */ + serf_bucket_t **our_stream; +} ssl_context_t; + +struct serf_ssl_certificate_t { + X509 *ssl_cert; + int depth; +}; + +/* Returns the amount read. */ +static int bio_bucket_read(BIO *bio, char *in, int inlen) +{ + serf_ssl_context_t *ctx = bio->ptr; + const char *data; + apr_status_t status; + apr_size_t len; + +#ifdef SSL_VERBOSE + printf("bio_bucket_read called for %d bytes\n", inlen); +#endif + + if (ctx->encrypt.status == SERF_ERROR_WAIT_CONN + && BIO_should_read(ctx->bio)) { +#ifdef SSL_VERBOSE + printf("bio_bucket_read waiting: (%d %d %d)\n", + BIO_should_retry(ctx->bio), BIO_should_read(ctx->bio), + BIO_get_retry_flags(ctx->bio)); +#endif + /* Falling back... */ + ctx->encrypt.exhausted_reset = 1; + BIO_clear_retry_flags(bio); + } + + status = serf_bucket_read(ctx->decrypt.pending, inlen, &data, &len); + + ctx->decrypt.status = status; +#ifdef SSL_VERBOSE + printf("bio_bucket_read received %d bytes (%d)\n", len, status); +#endif + + if (!SERF_BUCKET_READ_ERROR(status)) { + /* Oh suck. */ + if (len) { + memcpy(in, data, len); + return len; + } + if (APR_STATUS_IS_EOF(status)) { + BIO_set_retry_read(bio); + return -1; + } + } + + return -1; +} + +/* Returns the amount written. */ +static int bio_bucket_write(BIO *bio, const char *in, int inl) +{ + serf_ssl_context_t *ctx = bio->ptr; + serf_bucket_t *tmp; + +#ifdef SSL_VERBOSE + printf("bio_bucket_write called for %d bytes\n", inl); +#endif + if (ctx->encrypt.status == SERF_ERROR_WAIT_CONN + && !BIO_should_read(ctx->bio)) { +#ifdef SSL_VERBOSE + printf("bio_bucket_write waiting: (%d %d %d)\n", + BIO_should_retry(ctx->bio), BIO_should_read(ctx->bio), + BIO_get_retry_flags(ctx->bio)); +#endif + /* Falling back... */ + ctx->encrypt.exhausted_reset = 1; + BIO_clear_retry_flags(bio); + } + + tmp = serf_bucket_simple_copy_create(in, inl, + ctx->encrypt.pending->allocator); + + serf_bucket_aggregate_append(ctx->encrypt.pending, tmp); + + return inl; +} + +/* Returns the amount read. */ +static int bio_file_read(BIO *bio, char *in, int inlen) +{ + apr_file_t *file = bio->ptr; + apr_status_t status; + apr_size_t len; + + BIO_clear_retry_flags(bio); + + len = inlen; + status = apr_file_read(file, in, &len); + + if (!SERF_BUCKET_READ_ERROR(status)) { + /* Oh suck. */ + if (APR_STATUS_IS_EOF(status)) { + BIO_set_retry_read(bio); + return -1; + } else { + return len; + } + } + + return -1; +} + +/* Returns the amount written. */ +static int bio_file_write(BIO *bio, const char *in, int inl) +{ + apr_file_t *file = bio->ptr; + apr_size_t nbytes; + + BIO_clear_retry_flags(bio); + + nbytes = inl; + apr_file_write(file, in, &nbytes); + + return nbytes; +} + +static int bio_file_gets(BIO *bio, char *in, int inlen) +{ + return bio_file_read(bio, in, inlen); +} + +static int bio_bucket_create(BIO *bio) +{ + bio->shutdown = 1; + bio->init = 1; + bio->num = -1; + bio->ptr = NULL; + + return 1; +} + +static int bio_bucket_destroy(BIO *bio) +{ + /* Did we already free this? */ + if (bio == NULL) { + return 0; + } + + return 1; +} + +static long bio_bucket_ctrl(BIO *bio, int cmd, long num, void *ptr) +{ + long ret = 1; + + switch (cmd) { + default: + /* abort(); */ + break; + case BIO_CTRL_FLUSH: + /* At this point we can't force a flush. */ + break; + case BIO_CTRL_PUSH: + case BIO_CTRL_POP: + ret = 0; + break; + } + return ret; +} + +static BIO_METHOD bio_bucket_method = { + BIO_TYPE_MEM, + "Serf SSL encryption and decryption buckets", + bio_bucket_write, + bio_bucket_read, + NULL, /* Is this called? */ + NULL, /* Is this called? */ + bio_bucket_ctrl, + bio_bucket_create, + bio_bucket_destroy, +#ifdef OPENSSL_VERSION_NUMBER + NULL /* sslc does not have the callback_ctrl field */ +#endif +}; + +static BIO_METHOD bio_file_method = { + BIO_TYPE_FILE, + "Wrapper around APR file structures", + bio_file_write, + bio_file_read, + NULL, /* Is this called? */ + bio_file_gets, /* Is this called? */ + bio_bucket_ctrl, + bio_bucket_create, + bio_bucket_destroy, +#ifdef OPENSSL_VERSION_NUMBER + NULL /* sslc does not have the callback_ctrl field */ +#endif +}; + +static int +validate_server_certificate(int cert_valid, X509_STORE_CTX *store_ctx) +{ + SSL *ssl; + serf_ssl_context_t *ctx; + X509 *server_cert; + int err, depth; + int failures = 0; + + ssl = X509_STORE_CTX_get_ex_data(store_ctx, + SSL_get_ex_data_X509_STORE_CTX_idx()); + ctx = SSL_get_app_data(ssl); + + server_cert = X509_STORE_CTX_get_current_cert(store_ctx); + depth = X509_STORE_CTX_get_error_depth(store_ctx); + + /* If the certification was found invalid, get the error and convert it to + something our caller will understand. */ + if (! cert_valid) { + err = X509_STORE_CTX_get_error(store_ctx); + + switch(err) { + case X509_V_ERR_CERT_NOT_YET_VALID: + failures |= SERF_SSL_CERT_NOTYETVALID; + break; + case X509_V_ERR_CERT_HAS_EXPIRED: + failures |= SERF_SSL_CERT_EXPIRED; + break; + case X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT: + case X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN: + failures |= SERF_SSL_CERT_SELF_SIGNED; + break; + case X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY: + case X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT: + case X509_V_ERR_CERT_UNTRUSTED: + case X509_V_ERR_INVALID_CA: + failures |= SERF_SSL_CERT_UNKNOWNCA; + break; + default: + failures |= SERF_SSL_CERT_UNKNOWN_FAILURE; + break; + } + } + + /* Check certificate expiry dates. */ + if (X509_cmp_current_time(X509_get_notBefore(server_cert)) >= 0) { + failures |= SERF_SSL_CERT_NOTYETVALID; + } + else if (X509_cmp_current_time(X509_get_notAfter(server_cert)) <= 0) { + failures |= SERF_SSL_CERT_EXPIRED; + } + + if (ctx->server_cert_callback && + (depth == 0 || failures)) { + apr_status_t status; + serf_ssl_certificate_t *cert; + apr_pool_t *subpool; + + apr_pool_create(&subpool, ctx->pool); + + cert = apr_palloc(subpool, sizeof(serf_ssl_certificate_t)); + cert->ssl_cert = server_cert; + cert->depth = depth; + + /* Callback for further verification. */ + status = ctx->server_cert_callback(ctx->server_cert_userdata, + failures, cert); + if (status == APR_SUCCESS) + cert_valid = 1; + else + /* Pass the error back to the caller through the context-run. */ + ctx->pending_err = status; + apr_pool_destroy(subpool); + } + + if (ctx->server_cert_chain_callback + && (depth == 0 || failures)) { + apr_status_t status; + STACK_OF(X509) *chain; + const serf_ssl_certificate_t **certs; + int certs_len; + apr_pool_t *subpool; + + apr_pool_create(&subpool, ctx->pool); + + /* Borrow the chain to pass to the callback. */ + chain = X509_STORE_CTX_get_chain(store_ctx); + + /* If the chain can't be retrieved, just pass the current + certificate. */ + /* ### can this actually happen with _get_chain() ? */ + if (!chain) { + serf_ssl_certificate_t *cert = apr_palloc(subpool, sizeof(*cert)); + + cert->ssl_cert = server_cert; + cert->depth = depth; + + /* Room for the server_cert and a trailing NULL. */ + certs = apr_palloc(subpool, sizeof(*certs) * 2); + certs[0] = cert; + + certs_len = 1; + } else { + int i; + + certs_len = sk_X509_num(chain); + + /* Room for all the certs and a trailing NULL. */ + certs = apr_palloc(subpool, sizeof(*certs) * (certs_len + 1)); + for (i = 0; i < certs_len; ++i) { + serf_ssl_certificate_t *cert; + + cert = apr_palloc(subpool, sizeof(*cert)); + cert->ssl_cert = sk_X509_value(chain, i); + cert->depth = i; + + certs[i] = cert; + } + } + certs[certs_len] = NULL; + + /* Callback for further verification. */ + status = ctx->server_cert_chain_callback(ctx->server_cert_userdata, + failures, depth, + certs, certs_len); + if (status == APR_SUCCESS) { + cert_valid = 1; + } else { + /* Pass the error back to the caller through the context-run. */ + ctx->pending_err = status; + } + + apr_pool_destroy(subpool); + } + + return cert_valid; +} + +/* This function reads an encrypted stream and returns the decrypted stream. */ +static apr_status_t ssl_decrypt(void *baton, apr_size_t bufsize, + char *buf, apr_size_t *len) +{ + serf_ssl_context_t *ctx = baton; + apr_size_t priv_len; + apr_status_t status; + const char *data; + int ssl_len; + +#ifdef SSL_VERBOSE + printf("ssl_decrypt: begin %d\n", bufsize); +#endif + + /* Is there some data waiting to be read? */ + ssl_len = SSL_read(ctx->ssl, buf, bufsize); + if (ssl_len > 0) { +#ifdef SSL_VERBOSE + printf("ssl_decrypt: %d bytes (%d); status: %d; flags: %d\n", + ssl_len, bufsize, ctx->decrypt.status, + BIO_get_retry_flags(ctx->bio)); +#endif + *len = ssl_len; + return APR_SUCCESS; + } + + status = serf_bucket_read(ctx->decrypt.stream, bufsize, &data, &priv_len); + + if (!SERF_BUCKET_READ_ERROR(status) && priv_len) { + serf_bucket_t *tmp; + +#ifdef SSL_VERBOSE + printf("ssl_decrypt: read %d bytes (%d); status: %d\n", priv_len, + bufsize, status); +#endif + + tmp = serf_bucket_simple_copy_create(data, priv_len, + ctx->decrypt.pending->allocator); + + serf_bucket_aggregate_append(ctx->decrypt.pending, tmp); + + ssl_len = SSL_read(ctx->ssl, buf, bufsize); + if (ssl_len < 0) { + int ssl_err; + + ssl_err = SSL_get_error(ctx->ssl, ssl_len); + switch (ssl_err) { + case SSL_ERROR_SYSCALL: + *len = 0; + status = ctx->decrypt.status; + break; + case SSL_ERROR_WANT_READ: + *len = 0; + status = APR_EAGAIN; + break; + case SSL_ERROR_SSL: + *len = 0; + status = ctx->pending_err ? ctx->pending_err : APR_EGENERAL; + ctx->pending_err = 0; + break; + default: + *len = 0; + status = APR_EGENERAL; + break; + } + } + else { + *len = ssl_len; +#ifdef SSL_VERBOSE + printf("---\n%s\n-(%d)-\n", buf, *len); +#endif + } + } + else { + *len = 0; + } +#ifdef SSL_VERBOSE + printf("ssl_decrypt: %d %d %d\n", status, *len, + BIO_get_retry_flags(ctx->bio)); +#endif + return status; +} + +/* This function reads a decrypted stream and returns an encrypted stream. */ +static apr_status_t ssl_encrypt(void *baton, apr_size_t bufsize, + char *buf, apr_size_t *len) +{ + const char *data; + apr_size_t interim_bufsize; + serf_ssl_context_t *ctx = baton; + apr_status_t status; + +#ifdef SSL_VERBOSE + printf("ssl_encrypt: begin %d\n", bufsize); +#endif + + /* Try to read already encrypted but unread data first. */ + status = serf_bucket_read(ctx->encrypt.pending, bufsize, &data, len); + if (SERF_BUCKET_READ_ERROR(status)) { + return status; + } + + /* Aha, we read something. Return that now. */ + if (*len) { + memcpy(buf, data, *len); + if (APR_STATUS_IS_EOF(status)) { + status = APR_SUCCESS; + } +#ifdef SSL_VERBOSE + printf("ssl_encrypt: %d %d %d (quick read)\n", status, *len, + BIO_get_retry_flags(ctx->bio)); +#endif + return status; + } + + if (BIO_should_retry(ctx->bio) && BIO_should_write(ctx->bio)) { +#ifdef SSL_VERBOSE + printf("ssl_encrypt: %d %d %d (should write exit)\n", status, *len, + BIO_get_retry_flags(ctx->bio)); +#endif + return APR_EAGAIN; + } + + /* If we were previously blocked, unblock ourselves now. */ + if (BIO_should_read(ctx->bio)) { +#ifdef SSL_VERBOSE + printf("ssl_encrypt: reset %d %d (%d %d %d)\n", status, + ctx->encrypt.status, + BIO_should_retry(ctx->bio), BIO_should_read(ctx->bio), + BIO_get_retry_flags(ctx->bio)); +#endif + ctx->encrypt.status = APR_SUCCESS; + ctx->encrypt.exhausted_reset = 0; + } + + /* Oh well, read from our stream now. */ + interim_bufsize = bufsize; + do { + apr_size_t interim_len; + + if (!ctx->encrypt.status) { + struct iovec vecs[64]; + int vecs_read; + + status = serf_bucket_read_iovec(ctx->encrypt.stream, + interim_bufsize, 64, vecs, + &vecs_read); + + if (!SERF_BUCKET_READ_ERROR(status) && vecs_read) { + char *vecs_data; + int i, cur, vecs_data_len; + int ssl_len; + + /* Combine the buffers of the iovec into one buffer, as + that is with SSL_write requires. */ + vecs_data_len = 0; + for (i = 0; i < vecs_read; i++) { + vecs_data_len += vecs[i].iov_len; + } + + vecs_data = serf_bucket_mem_alloc(ctx->allocator, + vecs_data_len); + + cur = 0; + for (i = 0; i < vecs_read; i++) { + memcpy(vecs_data + cur, vecs[i].iov_base, vecs[i].iov_len); + cur += vecs[i].iov_len; + } + + interim_bufsize -= vecs_data_len; + interim_len = vecs_data_len; + +#ifdef SSL_VERBOSE + printf("ssl_encrypt: bucket read %d bytes; status %d\n", + interim_len, status); + printf("---\n%s\n-(%d)-\n", vecs_data, interim_len); +#endif + /* Stash our status away. */ + ctx->encrypt.status = status; + + ssl_len = SSL_write(ctx->ssl, vecs_data, interim_len); +#ifdef SSL_VERBOSE + printf("ssl_encrypt: SSL write: %d\n", ssl_len); +#endif + /* We're done. */ + serf_bucket_mem_free(ctx->allocator, vecs_data); + + /* If we failed to write... */ + if (ssl_len < 0) { + int ssl_err; + + /* Ah, bugger. We need to put that data back. */ + serf_bucket_aggregate_prepend_iovec(ctx->encrypt.stream, + vecs, vecs_read); + + ssl_err = SSL_get_error(ctx->ssl, ssl_len); +#ifdef SSL_VERBOSE + printf("ssl_encrypt: SSL write error: %d\n", ssl_err); +#endif + if (ssl_err == SSL_ERROR_SYSCALL) { + status = ctx->encrypt.status; + if (SERF_BUCKET_READ_ERROR(status)) { + return status; + } + } + else { + /* Oh, no. */ + if (ssl_err == SSL_ERROR_WANT_READ) { + status = SERF_ERROR_WAIT_CONN; + } + else { + status = APR_EGENERAL; + } + } +#ifdef SSL_VERBOSE + printf("ssl_encrypt: SSL write error: %d %d\n", status, *len); +#endif + } + } + } + else { + interim_len = 0; + *len = 0; + status = ctx->encrypt.status; + } + + } while (!status && interim_bufsize); + + /* Okay, we exhausted our underlying stream. */ + if (!SERF_BUCKET_READ_ERROR(status)) { + apr_status_t agg_status; + struct iovec vecs[64]; + int vecs_read, i; + + /* We read something! */ + agg_status = serf_bucket_read_iovec(ctx->encrypt.pending, bufsize, + 64, vecs, &vecs_read); + *len = 0; + for (i = 0; i < vecs_read; i++) { + memcpy(buf + *len, vecs[i].iov_base, vecs[i].iov_len); + *len += vecs[i].iov_len; + } + +#ifdef SSL_VERBOSE + printf("ssl_encrypt read agg: %d %d %d %d\n", status, agg_status, + ctx->encrypt.status, *len); +#endif + + if (!agg_status) { + status = agg_status; + } + } + + if (status == SERF_ERROR_WAIT_CONN + && BIO_should_retry(ctx->bio) && BIO_should_read(ctx->bio)) { + ctx->encrypt.exhausted = ctx->encrypt.status; + ctx->encrypt.status = SERF_ERROR_WAIT_CONN; + } + +#ifdef SSL_VERBOSE + printf("ssl_encrypt finished: %d %d (%d %d %d)\n", status, *len, + BIO_should_retry(ctx->bio), BIO_should_read(ctx->bio), + BIO_get_retry_flags(ctx->bio)); +#endif + return status; +} + +#if APR_HAS_THREADS +static apr_pool_t *ssl_pool; +static apr_thread_mutex_t **ssl_locks; + +typedef struct CRYPTO_dynlock_value { + apr_thread_mutex_t *lock; +} CRYPTO_dynlock_value; + +static CRYPTO_dynlock_value *ssl_dyn_create(const char* file, int line) +{ + CRYPTO_dynlock_value *l; + apr_status_t rv; + + l = apr_palloc(ssl_pool, sizeof(CRYPTO_dynlock_value)); + rv = apr_thread_mutex_create(&l->lock, APR_THREAD_MUTEX_DEFAULT, ssl_pool); + if (rv != APR_SUCCESS) { + /* FIXME: return error here */ + } + return l; +} + +static void ssl_dyn_lock(int mode, CRYPTO_dynlock_value *l, const char *file, + int line) +{ + if (mode & CRYPTO_LOCK) { + apr_thread_mutex_lock(l->lock); + } + else if (mode & CRYPTO_UNLOCK) { + apr_thread_mutex_unlock(l->lock); + } +} + +static void ssl_dyn_destroy(CRYPTO_dynlock_value *l, const char *file, + int line) +{ + apr_thread_mutex_destroy(l->lock); +} + +static void ssl_lock(int mode, int n, const char *file, int line) +{ + if (mode & CRYPTO_LOCK) { + apr_thread_mutex_lock(ssl_locks[n]); + } + else if (mode & CRYPTO_UNLOCK) { + apr_thread_mutex_unlock(ssl_locks[n]); + } +} + +static unsigned long ssl_id(void) +{ + /* FIXME: This is lame and not portable. -aaron */ + return (unsigned long) apr_os_thread_current(); +} + +static apr_status_t cleanup_ssl(void *data) +{ + CRYPTO_set_locking_callback(NULL); + CRYPTO_set_id_callback(NULL); + CRYPTO_set_dynlock_create_callback(NULL); + CRYPTO_set_dynlock_lock_callback(NULL); + CRYPTO_set_dynlock_destroy_callback(NULL); + + return APR_SUCCESS; +} + +#endif + +static apr_uint32_t have_init_ssl = 0; + +static void init_ssl_libraries(void) +{ + apr_uint32_t val; +#if APR_VERSION_AT_LEAST(1,0,0) + val = apr_atomic_xchg32(&have_init_ssl, 1); +#else + val = apr_atomic_cas(&have_init_ssl, 1, 0); +#endif + + if (!val) { +#if APR_HAS_THREADS + int i, numlocks; +#endif + CRYPTO_malloc_init(); + ERR_load_crypto_strings(); + SSL_load_error_strings(); + SSL_library_init(); + OpenSSL_add_all_algorithms(); + +#if APR_HAS_THREADS + numlocks = CRYPTO_num_locks(); + apr_pool_create(&ssl_pool, NULL); + ssl_locks = apr_palloc(ssl_pool, sizeof(apr_thread_mutex_t*)*numlocks); + for (i = 0; i < numlocks; i++) { + apr_status_t rv; + + /* Intraprocess locks don't /need/ a filename... */ + rv = apr_thread_mutex_create(&ssl_locks[i], + APR_THREAD_MUTEX_DEFAULT, ssl_pool); + if (rv != APR_SUCCESS) { + /* FIXME: error out here */ + } + } + CRYPTO_set_locking_callback(ssl_lock); + CRYPTO_set_id_callback(ssl_id); + CRYPTO_set_dynlock_create_callback(ssl_dyn_create); + CRYPTO_set_dynlock_lock_callback(ssl_dyn_lock); + CRYPTO_set_dynlock_destroy_callback(ssl_dyn_destroy); + + apr_pool_cleanup_register(ssl_pool, NULL, cleanup_ssl, cleanup_ssl); +#endif + } +} + +static int ssl_need_client_cert(SSL *ssl, X509 **cert, EVP_PKEY **pkey) +{ + serf_ssl_context_t *ctx = SSL_get_app_data(ssl); + apr_status_t status; + + if (ctx->cached_cert) { + *cert = ctx->cached_cert; + *pkey = ctx->cached_cert_pw; + return 1; + } + + while (ctx->cert_callback) { + const char *cert_path; + apr_file_t *cert_file; + BIO *bio; + PKCS12 *p12; + int i; + int retrying_success = 0; + + if (ctx->cert_file_success) { + status = APR_SUCCESS; + cert_path = ctx->cert_file_success; + ctx->cert_file_success = NULL; + retrying_success = 1; + } else { + status = ctx->cert_callback(ctx->cert_userdata, &cert_path); + } + + if (status || !cert_path) { + break; + } + + /* Load the x.509 cert file stored in PKCS12 */ + status = apr_file_open(&cert_file, cert_path, APR_READ, APR_OS_DEFAULT, + ctx->pool); + + if (status) { + continue; + } + + bio = BIO_new(&bio_file_method); + bio->ptr = cert_file; + + ctx->cert_path = cert_path; + p12 = d2i_PKCS12_bio(bio, NULL); + apr_file_close(cert_file); + + i = PKCS12_parse(p12, NULL, pkey, cert, NULL); + + if (i == 1) { + PKCS12_free(p12); + ctx->cached_cert = *cert; + ctx->cached_cert_pw = *pkey; + if (!retrying_success && ctx->cert_cache_pool) { + const char *c; + + c = apr_pstrdup(ctx->cert_cache_pool, ctx->cert_path); + + apr_pool_userdata_setn(c, "serf:ssl:cert", + apr_pool_cleanup_null, + ctx->cert_cache_pool); + } + return 1; + } + else { + int err = ERR_get_error(); + ERR_clear_error(); + if (ERR_GET_LIB(err) == ERR_LIB_PKCS12 && + ERR_GET_REASON(err) == PKCS12_R_MAC_VERIFY_FAILURE) { + if (ctx->cert_pw_callback) { + const char *password; + + if (ctx->cert_pw_success) { + status = APR_SUCCESS; + password = ctx->cert_pw_success; + ctx->cert_pw_success = NULL; + } else { + status = ctx->cert_pw_callback(ctx->cert_pw_userdata, + ctx->cert_path, + &password); + } + + if (!status && password) { + i = PKCS12_parse(p12, password, pkey, cert, NULL); + if (i == 1) { + PKCS12_free(p12); + ctx->cached_cert = *cert; + ctx->cached_cert_pw = *pkey; + if (!retrying_success && ctx->cert_cache_pool) { + const char *c; + + c = apr_pstrdup(ctx->cert_cache_pool, + ctx->cert_path); + + apr_pool_userdata_setn(c, "serf:ssl:cert", + apr_pool_cleanup_null, + ctx->cert_cache_pool); + } + if (!retrying_success && ctx->cert_pw_cache_pool) { + const char *c; + + c = apr_pstrdup(ctx->cert_pw_cache_pool, + password); + + apr_pool_userdata_setn(c, "serf:ssl:certpw", + apr_pool_cleanup_null, + ctx->cert_pw_cache_pool); + } + return 1; + } + } + } + PKCS12_free(p12); + return 0; + } + else { + printf("OpenSSL cert error: %d %d %d\n", ERR_GET_LIB(err), + ERR_GET_FUNC(err), + ERR_GET_REASON(err)); + PKCS12_free(p12); + } + } + } + + return 0; +} + + +void serf_ssl_client_cert_provider_set( + serf_ssl_context_t *context, + serf_ssl_need_client_cert_t callback, + void *data, + void *cache_pool) +{ + context->cert_callback = callback; + context->cert_userdata = data; + context->cert_cache_pool = cache_pool; + if (context->cert_cache_pool) { + apr_pool_userdata_get((void**)&context->cert_file_success, + "serf:ssl:cert", cache_pool); + } +} + + +void serf_ssl_client_cert_password_set( + serf_ssl_context_t *context, + serf_ssl_need_cert_password_t callback, + void *data, + void *cache_pool) +{ + context->cert_pw_callback = callback; + context->cert_pw_userdata = data; + context->cert_pw_cache_pool = cache_pool; + if (context->cert_pw_cache_pool) { + apr_pool_userdata_get((void**)&context->cert_pw_success, + "serf:ssl:certpw", cache_pool); + } +} + + +void serf_ssl_server_cert_callback_set( + serf_ssl_context_t *context, + serf_ssl_need_server_cert_t callback, + void *data) +{ + context->server_cert_callback = callback; + context->server_cert_userdata = data; +} + +void serf_ssl_server_cert_chain_callback_set( + serf_ssl_context_t *context, + serf_ssl_need_server_cert_t cert_callback, + serf_ssl_server_cert_chain_cb_t cert_chain_callback, + void *data) +{ + context->server_cert_callback = cert_callback; + context->server_cert_chain_callback = cert_chain_callback; + context->server_cert_userdata = data; +} + +static serf_ssl_context_t *ssl_init_context(void) +{ + serf_ssl_context_t *ssl_ctx; + apr_pool_t *pool; + serf_bucket_alloc_t *allocator; + + init_ssl_libraries(); + + apr_pool_create(&pool, NULL); + allocator = serf_bucket_allocator_create(pool, NULL, NULL); + + ssl_ctx = serf_bucket_mem_alloc(allocator, sizeof(*ssl_ctx)); + + ssl_ctx->refcount = 0; + ssl_ctx->pool = pool; + ssl_ctx->allocator = allocator; + + ssl_ctx->ctx = SSL_CTX_new(SSLv23_client_method()); + + SSL_CTX_set_client_cert_cb(ssl_ctx->ctx, ssl_need_client_cert); + ssl_ctx->cached_cert = 0; + ssl_ctx->cached_cert_pw = 0; + ssl_ctx->pending_err = APR_SUCCESS; + + ssl_ctx->cert_callback = NULL; + ssl_ctx->cert_pw_callback = NULL; + ssl_ctx->server_cert_callback = NULL; + ssl_ctx->server_cert_chain_callback = NULL; + + SSL_CTX_set_verify(ssl_ctx->ctx, SSL_VERIFY_PEER, + validate_server_certificate); + SSL_CTX_set_options(ssl_ctx->ctx, SSL_OP_ALL); + + ssl_ctx->ssl = SSL_new(ssl_ctx->ctx); + ssl_ctx->bio = BIO_new(&bio_bucket_method); + ssl_ctx->bio->ptr = ssl_ctx; + + SSL_set_bio(ssl_ctx->ssl, ssl_ctx->bio, ssl_ctx->bio); + + SSL_set_connect_state(ssl_ctx->ssl); + + SSL_set_app_data(ssl_ctx->ssl, ssl_ctx); + + ssl_ctx->encrypt.stream = NULL; + ssl_ctx->encrypt.stream_next = NULL; + ssl_ctx->encrypt.pending = serf_bucket_aggregate_create(allocator); + ssl_ctx->encrypt.status = APR_SUCCESS; + serf_databuf_init(&ssl_ctx->encrypt.databuf); + ssl_ctx->encrypt.databuf.read = ssl_encrypt; + ssl_ctx->encrypt.databuf.read_baton = ssl_ctx; + + ssl_ctx->decrypt.stream = NULL; + ssl_ctx->decrypt.pending = serf_bucket_aggregate_create(allocator); + ssl_ctx->decrypt.status = APR_SUCCESS; + serf_databuf_init(&ssl_ctx->decrypt.databuf); + ssl_ctx->decrypt.databuf.read = ssl_decrypt; + ssl_ctx->decrypt.databuf.read_baton = ssl_ctx; + + return ssl_ctx; +} + +static apr_status_t ssl_free_context( + serf_ssl_context_t *ssl_ctx) +{ + apr_pool_t *p; + + /* If never had the pending buckets, don't try to free them. */ + if (ssl_ctx->decrypt.pending != NULL) { + serf_bucket_destroy(ssl_ctx->decrypt.pending); + } + if (ssl_ctx->encrypt.pending != NULL) { + serf_bucket_destroy(ssl_ctx->encrypt.pending); + } + + /* SSL_free implicitly frees the underlying BIO. */ + SSL_free(ssl_ctx->ssl); + SSL_CTX_free(ssl_ctx->ctx); + + p = ssl_ctx->pool; + + serf_bucket_mem_free(ssl_ctx->allocator, ssl_ctx); + apr_pool_destroy(p); + + return APR_SUCCESS; +} + +static serf_bucket_t * serf_bucket_ssl_create( + serf_ssl_context_t *ssl_ctx, + serf_bucket_alloc_t *allocator, + const serf_bucket_type_t *type) +{ + ssl_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + if (!ssl_ctx) { + ctx->ssl_ctx = ssl_init_context(); + } + else { + ctx->ssl_ctx = ssl_ctx; + } + ctx->ssl_ctx->refcount++; + + return serf_bucket_create(type, allocator, ctx); +} + +apr_status_t serf_ssl_set_hostname(serf_ssl_context_t *context, + const char * hostname) +{ +#ifdef SSL_set_tlsext_host_name + if (SSL_set_tlsext_host_name(context->ssl, hostname) != 1) { + ERR_clear_error(); + } +#endif + return APR_SUCCESS; +} + +apr_status_t serf_ssl_use_default_certificates(serf_ssl_context_t *ssl_ctx) +{ + X509_STORE *store = SSL_CTX_get_cert_store(ssl_ctx->ctx); + + int result = X509_STORE_set_default_paths(store); + + return result ? APR_SUCCESS : APR_EGENERAL; +} + +apr_status_t serf_ssl_load_cert_file( + serf_ssl_certificate_t **cert, + const char *file_path, + apr_pool_t *pool) +{ + FILE *fp = fopen(file_path, "r"); + + if (fp) { + X509 *ssl_cert = PEM_read_X509(fp, NULL, NULL, NULL); + fclose(fp); + + if (ssl_cert) { + *cert = apr_palloc(pool, sizeof(serf_ssl_certificate_t)); + (*cert)->ssl_cert = ssl_cert; + + return APR_SUCCESS; + } + } + + return APR_EGENERAL; +} + + +apr_status_t serf_ssl_trust_cert( + serf_ssl_context_t *ssl_ctx, + serf_ssl_certificate_t *cert) +{ + X509_STORE *store = SSL_CTX_get_cert_store(ssl_ctx->ctx); + + int result = X509_STORE_add_cert(store, cert->ssl_cert); + + return result ? APR_SUCCESS : APR_EGENERAL; +} + + +serf_bucket_t *serf_bucket_ssl_decrypt_create( + serf_bucket_t *stream, + serf_ssl_context_t *ssl_ctx, + serf_bucket_alloc_t *allocator) +{ + serf_bucket_t *bkt; + ssl_context_t *ctx; + + bkt = serf_bucket_ssl_create(ssl_ctx, allocator, + &serf_bucket_type_ssl_decrypt); + + ctx = bkt->data; + + ctx->databuf = &ctx->ssl_ctx->decrypt.databuf; + if (ctx->ssl_ctx->decrypt.stream != NULL) { + return NULL; + } + ctx->ssl_ctx->decrypt.stream = stream; + ctx->our_stream = &ctx->ssl_ctx->decrypt.stream; + + return bkt; +} + + +serf_ssl_context_t *serf_bucket_ssl_decrypt_context_get( + serf_bucket_t *bucket) +{ + ssl_context_t *ctx = bucket->data; + return ctx->ssl_ctx; +} + + +serf_bucket_t *serf_bucket_ssl_encrypt_create( + serf_bucket_t *stream, + serf_ssl_context_t *ssl_ctx, + serf_bucket_alloc_t *allocator) +{ + serf_bucket_t *bkt; + ssl_context_t *ctx; + + bkt = serf_bucket_ssl_create(ssl_ctx, allocator, + &serf_bucket_type_ssl_encrypt); + + ctx = bkt->data; + + ctx->databuf = &ctx->ssl_ctx->encrypt.databuf; + ctx->our_stream = &ctx->ssl_ctx->encrypt.stream; + if (ctx->ssl_ctx->encrypt.stream == NULL) { + serf_bucket_t *tmp = serf_bucket_aggregate_create(stream->allocator); + serf_bucket_aggregate_append(tmp, stream); + ctx->ssl_ctx->encrypt.stream = tmp; + } + else { + bucket_list_t *new_list; + + new_list = serf_bucket_mem_alloc(ctx->ssl_ctx->allocator, + sizeof(*new_list)); + new_list->bucket = stream; + new_list->next = NULL; + if (ctx->ssl_ctx->encrypt.stream_next == NULL) { + ctx->ssl_ctx->encrypt.stream_next = new_list; + } + else { + bucket_list_t *scan = ctx->ssl_ctx->encrypt.stream_next; + + while (scan->next != NULL) + scan = scan->next; + scan->next = new_list; + } + } + + return bkt; +} + + +serf_ssl_context_t *serf_bucket_ssl_encrypt_context_get( + serf_bucket_t *bucket) +{ + ssl_context_t *ctx = bucket->data; + return ctx->ssl_ctx; +} + +/* Functions to read a serf_ssl_certificate structure. */ + +/* Creates a hash_table with keys (E, CN, OU, O, L, ST and C). */ +static apr_hash_t * +convert_X509_NAME_to_table(X509_NAME *org, apr_pool_t *pool) +{ + char buf[1024]; + int ret; + + apr_hash_t *tgt = apr_hash_make(pool); + + ret = X509_NAME_get_text_by_NID(org, + NID_commonName, + buf, 1024); + if (ret != -1) + apr_hash_set(tgt, "CN", APR_HASH_KEY_STRING, apr_pstrdup(pool, buf)); + ret = X509_NAME_get_text_by_NID(org, + NID_pkcs9_emailAddress, + buf, 1024); + if (ret != -1) + apr_hash_set(tgt, "E", APR_HASH_KEY_STRING, apr_pstrdup(pool, buf)); + ret = X509_NAME_get_text_by_NID(org, + NID_organizationalUnitName, + buf, 1024); + if (ret != -1) + apr_hash_set(tgt, "OU", APR_HASH_KEY_STRING, apr_pstrdup(pool, buf)); + ret = X509_NAME_get_text_by_NID(org, + NID_organizationName, + buf, 1024); + if (ret != -1) + apr_hash_set(tgt, "O", APR_HASH_KEY_STRING, apr_pstrdup(pool, buf)); + ret = X509_NAME_get_text_by_NID(org, + NID_localityName, + buf, 1024); + if (ret != -1) + apr_hash_set(tgt, "L", APR_HASH_KEY_STRING, apr_pstrdup(pool, buf)); + ret = X509_NAME_get_text_by_NID(org, + NID_stateOrProvinceName, + buf, 1024); + if (ret != -1) + apr_hash_set(tgt, "ST", APR_HASH_KEY_STRING, apr_pstrdup(pool, buf)); + ret = X509_NAME_get_text_by_NID(org, + NID_countryName, + buf, 1024); + if (ret != -1) + apr_hash_set(tgt, "C", APR_HASH_KEY_STRING, apr_pstrdup(pool, buf)); + + return tgt; +} + + +int serf_ssl_cert_depth(const serf_ssl_certificate_t *cert) +{ + return cert->depth; +} + + +apr_hash_t *serf_ssl_cert_issuer( + const serf_ssl_certificate_t *cert, + apr_pool_t *pool) +{ + X509_NAME *issuer = X509_get_issuer_name(cert->ssl_cert); + + if (!issuer) + return NULL; + + return convert_X509_NAME_to_table(issuer, pool); +} + + +apr_hash_t *serf_ssl_cert_subject( + const serf_ssl_certificate_t *cert, + apr_pool_t *pool) +{ + X509_NAME *subject = X509_get_subject_name(cert->ssl_cert); + + if (!subject) + return NULL; + + return convert_X509_NAME_to_table(subject, pool); +} + + +apr_hash_t *serf_ssl_cert_certificate( + const serf_ssl_certificate_t *cert, + apr_pool_t *pool) +{ + apr_hash_t *tgt = apr_hash_make(pool); + unsigned int md_size, i; + unsigned char md[EVP_MAX_MD_SIZE]; + BIO *bio; + STACK_OF(GENERAL_NAME) *names; + + /* sha1 fingerprint */ + if (X509_digest(cert->ssl_cert, EVP_sha1(), md, &md_size)) { + const char hex[] = "0123456789ABCDEF"; + char fingerprint[EVP_MAX_MD_SIZE * 3]; + + for (i=0; i> 4]; + fingerprint[(3*i)+1] = hex[(md[i] & 0x0f)]; + fingerprint[(3*i)+2] = ':'; + } + if (md_size > 0) + fingerprint[(3*(md_size-1))+2] = '\0'; + else + fingerprint[0] = '\0'; + + apr_hash_set(tgt, "sha1", APR_HASH_KEY_STRING, + apr_pstrdup(pool, fingerprint)); + } + + /* set expiry dates */ + bio = BIO_new(BIO_s_mem()); + if (bio) { + ASN1_TIME *notBefore, *notAfter; + char buf[256]; + + memset (buf, 0, sizeof (buf)); + notBefore = X509_get_notBefore(cert->ssl_cert); + if (ASN1_TIME_print(bio, notBefore)) { + BIO_read(bio, buf, 255); + apr_hash_set(tgt, "notBefore", APR_HASH_KEY_STRING, + apr_pstrdup(pool, buf)); + } + memset (buf, 0, sizeof (buf)); + notAfter = X509_get_notAfter(cert->ssl_cert); + if (ASN1_TIME_print(bio, notAfter)) { + BIO_read(bio, buf, 255); + apr_hash_set(tgt, "notAfter", APR_HASH_KEY_STRING, + apr_pstrdup(pool, buf)); + } + } + BIO_free(bio); + + /* Get subjectAltNames */ + names = X509_get_ext_d2i(cert->ssl_cert, NID_subject_alt_name, NULL, NULL); + if (names) { + int names_count = sk_GENERAL_NAME_num(names); + + apr_array_header_t *san_arr = apr_array_make(pool, names_count, + sizeof(char*)); + apr_hash_set(tgt, "subjectAltName", APR_HASH_KEY_STRING, san_arr); + for (i = 0; i < names_count; i++) { + char *p = NULL; + GENERAL_NAME *nm = sk_GENERAL_NAME_value(names, i); + + switch (nm->type) { + case GEN_DNS: + p = apr_pstrmemdup(pool, (const char *)nm->d.ia5->data, + nm->d.ia5->length); + break; + default: + /* Don't know what to do - skip. */ + break; + } + if (p) { + APR_ARRAY_PUSH(san_arr, char*) = p; + } + } + sk_GENERAL_NAME_pop_free(names, GENERAL_NAME_free); + } + + return tgt; +} + + +const char *serf_ssl_cert_export( + const serf_ssl_certificate_t *cert, + apr_pool_t *pool) +{ + char *binary_cert; + char *encoded_cert; + int len; + unsigned char *unused; + + /* find the length of the DER encoding. */ + len = i2d_X509(cert->ssl_cert, NULL); + if (len < 0) { + return NULL; + } + + binary_cert = apr_palloc(pool, len); + unused = (unsigned char *)binary_cert; + len = i2d_X509(cert->ssl_cert, &unused); /* unused is incremented */ + if (len < 0) { + return NULL; + } + + encoded_cert = apr_palloc(pool, apr_base64_encode_len(len)); + apr_base64_encode(encoded_cert, binary_cert, len); + + return encoded_cert; +} + +static void serf_ssl_destroy_and_data(serf_bucket_t *bucket) +{ + ssl_context_t *ctx = bucket->data; + + if (!--ctx->ssl_ctx->refcount) { + ssl_free_context(ctx->ssl_ctx); + } + + serf_default_destroy_and_data(bucket); +} + +static void serf_ssl_decrypt_destroy_and_data(serf_bucket_t *bucket) +{ + ssl_context_t *ctx = bucket->data; + + serf_bucket_destroy(*ctx->our_stream); + + serf_ssl_destroy_and_data(bucket); +} + +static void serf_ssl_encrypt_destroy_and_data(serf_bucket_t *bucket) +{ + ssl_context_t *ctx = bucket->data; + serf_ssl_context_t *ssl_ctx = ctx->ssl_ctx; + + if (ssl_ctx->encrypt.stream == *ctx->our_stream) { + serf_bucket_destroy(*ctx->our_stream); + serf_bucket_destroy(ssl_ctx->encrypt.pending); + + /* Reset our encrypted status and databuf. */ + ssl_ctx->encrypt.status = APR_SUCCESS; + ssl_ctx->encrypt.databuf.status = APR_SUCCESS; + + /* Advance to the next stream - if we have one. */ + if (ssl_ctx->encrypt.stream_next == NULL) { + ssl_ctx->encrypt.stream = NULL; + ssl_ctx->encrypt.pending = NULL; + } + else { + bucket_list_t *cur; + + cur = ssl_ctx->encrypt.stream_next; + ssl_ctx->encrypt.stream = cur->bucket; + ssl_ctx->encrypt.pending = + serf_bucket_aggregate_create(cur->bucket->allocator); + ssl_ctx->encrypt.stream_next = cur->next; + serf_bucket_mem_free(ssl_ctx->allocator, cur); + } + } + else { + /* Ah, darn. We haven't sent this one along yet. */ + return; + } + serf_ssl_destroy_and_data(bucket); +} + +static apr_status_t serf_ssl_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + ssl_context_t *ctx = bucket->data; + + return serf_databuf_read(ctx->databuf, requested, data, len); +} + +static apr_status_t serf_ssl_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, + apr_size_t *len) +{ + ssl_context_t *ctx = bucket->data; + + return serf_databuf_readline(ctx->databuf, acceptable, found, data, len); +} + +static apr_status_t serf_ssl_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + ssl_context_t *ctx = bucket->data; + + return serf_databuf_peek(ctx->databuf, data, len); +} + + +const serf_bucket_type_t serf_bucket_type_ssl_encrypt = { + "SSLENCRYPT", + serf_ssl_read, + serf_ssl_readline, + serf_default_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_ssl_peek, + serf_ssl_encrypt_destroy_and_data, +}; + +const serf_bucket_type_t serf_bucket_type_ssl_decrypt = { + "SSLDECRYPT", + serf_ssl_read, + serf_ssl_readline, + serf_default_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_ssl_peek, + serf_ssl_decrypt_destroy_and_data, +}; -- cgit v1.2.1