diff options
Diffstat (limited to 'buckets/bwtp_buckets.c')
-rw-r--r-- | buckets/bwtp_buckets.c | 596 |
1 files changed, 596 insertions, 0 deletions
diff --git a/buckets/bwtp_buckets.c b/buckets/bwtp_buckets.c new file mode 100644 index 0000000..7ef3047 --- /dev/null +++ b/buckets/bwtp_buckets.c @@ -0,0 +1,596 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <apr_pools.h> +#include <apr_strings.h> +#include <apr_lib.h> +#include <apr_date.h> + +#include "serf.h" +#include "serf_bucket_util.h" +#include "serf_bucket_types.h" + +#include <stdlib.h> + +/* This is an implementation of Bidirectional Web Transfer Protocol (BWTP) + * See: + * http://bwtp.wikidot.com/ + */ + +typedef struct { + int channel; + int open; + int type; /* 0 = header, 1 = message */ /* TODO enum? */ + const char *phrase; + serf_bucket_t *headers; + + char req_line[1000]; +} frame_context_t; + +typedef struct { + serf_bucket_t *stream; + serf_bucket_t *body; /* Pointer to the stream wrapping the body. */ + serf_bucket_t *headers; /* holds parsed headers */ + + enum { + STATE_STATUS_LINE, /* reading status line */ + STATE_HEADERS, /* reading headers */ + STATE_BODY, /* reading body */ + STATE_DONE /* we've sent EOF */ + } state; + + /* Buffer for accumulating a line from the response. */ + serf_linebuf_t linebuf; + + int type; /* 0 = header, 1 = message */ /* TODO enum? */ + int channel; + char *phrase; + apr_size_t length; +} incoming_context_t; + + +serf_bucket_t *serf_bucket_bwtp_channel_close( + int channel, + serf_bucket_alloc_t *allocator) +{ + frame_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->type = 0; + ctx->open = 0; + ctx->channel = channel; + ctx->phrase = "CLOSED"; + ctx->headers = serf_bucket_headers_create(allocator); + + return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); +} + +serf_bucket_t *serf_bucket_bwtp_channel_open( + int channel, + const char *uri, + serf_bucket_alloc_t *allocator) +{ + frame_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->type = 0; + ctx->open = 1; + ctx->channel = channel; + ctx->phrase = uri; + ctx->headers = serf_bucket_headers_create(allocator); + + return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); +} + +serf_bucket_t *serf_bucket_bwtp_header_create( + int channel, + const char *phrase, + serf_bucket_alloc_t *allocator) +{ + frame_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->type = 0; + ctx->open = 0; + ctx->channel = channel; + ctx->phrase = phrase; + ctx->headers = serf_bucket_headers_create(allocator); + + return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); +} + +serf_bucket_t *serf_bucket_bwtp_message_create( + int channel, + serf_bucket_t *body, + serf_bucket_alloc_t *allocator) +{ + frame_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->type = 1; + ctx->open = 0; + ctx->channel = channel; + ctx->phrase = "MESSAGE"; + ctx->headers = serf_bucket_headers_create(allocator); + + return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); +} + +int serf_bucket_bwtp_frame_get_channel( + serf_bucket_t *bucket) +{ + if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { + frame_context_t *ctx = bucket->data; + + return ctx->channel; + } + else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { + incoming_context_t *ctx = bucket->data; + + return ctx->channel; + } + + return -1; +} + +int serf_bucket_bwtp_frame_get_type( + serf_bucket_t *bucket) +{ + if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { + frame_context_t *ctx = bucket->data; + + return ctx->type; + } + else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { + incoming_context_t *ctx = bucket->data; + + return ctx->type; + } + + return -1; +} + +const char *serf_bucket_bwtp_frame_get_phrase( + serf_bucket_t *bucket) +{ + if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { + frame_context_t *ctx = bucket->data; + + return ctx->phrase; + } + else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { + incoming_context_t *ctx = bucket->data; + + return ctx->phrase; + } + + return NULL; +} + +serf_bucket_t *serf_bucket_bwtp_frame_get_headers( + serf_bucket_t *bucket) +{ + if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { + frame_context_t *ctx = bucket->data; + + return ctx->headers; + } + else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { + incoming_context_t *ctx = bucket->data; + + return ctx->headers; + } + + return NULL; +} + +static int count_size(void *baton, const char *key, const char *value) +{ + apr_size_t *c = baton; + /* TODO Deal with folding. Yikes. */ + + /* Add in ": " and CRLF - so an extra four bytes. */ + *c += strlen(key) + strlen(value) + 4; + + return 0; +} + +static apr_size_t calc_header_size(serf_bucket_t *hdrs) +{ + apr_size_t size = 0; + + serf_bucket_headers_do(hdrs, count_size, &size); + + return size; +} + +static void serialize_data(serf_bucket_t *bucket) +{ + frame_context_t *ctx = bucket->data; + serf_bucket_t *new_bucket; + apr_size_t req_len; + + /* Serialize the request-line and headers into one mother string, + * and wrap a bucket around it. + */ + req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line), + "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n", + (ctx->type ? "BWM" : "BWH"), + ctx->channel, calc_header_size(ctx->headers), + (ctx->open ? "OPEN " : ""), + ctx->phrase); + new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len, + bucket->allocator); + + /* Build up the new bucket structure. + * + * Note that self needs to become an aggregate bucket so that a + * pointer to self still represents the "right" data. + */ + serf_bucket_aggregate_become(bucket); + + /* Insert the two buckets. */ + serf_bucket_aggregate_append(bucket, new_bucket); + serf_bucket_aggregate_append(bucket, ctx->headers); + + /* Our private context is no longer needed, and is not referred to by + * any existing bucket. Toss it. + */ + serf_bucket_mem_free(bucket->allocator, ctx); +} + +static apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + /* Seralize our private data into a new aggregate bucket. */ + serialize_data(bucket); + + /* Delegate to the "new" aggregate bucket to do the read. */ + return serf_bucket_read(bucket, requested, data, len); +} + +static apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + /* Seralize our private data into a new aggregate bucket. */ + serialize_data(bucket); + + /* Delegate to the "new" aggregate bucket to do the readline. */ + return serf_bucket_readline(bucket, acceptable, found, data, len); +} + +static apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket, + apr_size_t requested, + int vecs_size, + struct iovec *vecs, + int *vecs_used) +{ + /* Seralize our private data into a new aggregate bucket. */ + serialize_data(bucket); + + /* Delegate to the "new" aggregate bucket to do the read. */ + return serf_bucket_read_iovec(bucket, requested, + vecs_size, vecs, vecs_used); +} + +static apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + /* Seralize our private data into a new aggregate bucket. */ + serialize_data(bucket); + + /* Delegate to the "new" aggregate bucket to do the peek. */ + return serf_bucket_peek(bucket, data, len); +} + +const serf_bucket_type_t serf_bucket_type_bwtp_frame = { + "BWTP-FRAME", + serf_bwtp_frame_read, + serf_bwtp_frame_readline, + serf_bwtp_frame_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + serf_bwtp_frame_peek, + serf_default_destroy_and_data, +}; + + +serf_bucket_t *serf_bucket_bwtp_incoming_frame_create( + serf_bucket_t *stream, + serf_bucket_alloc_t *allocator) +{ + incoming_context_t *ctx; + + ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->stream = stream; + ctx->body = NULL; + ctx->headers = serf_bucket_headers_create(allocator); + ctx->state = STATE_STATUS_LINE; + ctx->length = 0; + ctx->channel = -1; + ctx->phrase = NULL; + + serf_linebuf_init(&ctx->linebuf); + + return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx); +} + +static void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket) +{ + incoming_context_t *ctx = bucket->data; + + if (ctx->state != STATE_STATUS_LINE && ctx->phrase) { + serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase); + } + + serf_bucket_destroy(ctx->stream); + if (ctx->body != NULL) + serf_bucket_destroy(ctx->body); + serf_bucket_destroy(ctx->headers); + + serf_default_destroy_and_data(bucket); +} + +static apr_status_t fetch_line(incoming_context_t *ctx, int acceptable) +{ + return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable); +} + +static apr_status_t parse_status_line(incoming_context_t *ctx, + serf_bucket_alloc_t *allocator) +{ + int res; + char *reason; /* ### stupid APR interface makes this non-const */ + + /* ctx->linebuf.line should be of form: BW* */ + res = apr_date_checkmask(ctx->linebuf.line, "BW*"); + if (!res) { + /* Not an BWTP response? Well, at least we won't understand it. */ + return APR_EGENERAL; + } + + if (ctx->linebuf.line[2] == 'H') { + ctx->type = 0; + } + else if (ctx->linebuf.line[2] == 'M') { + ctx->type = 1; + } + else { + ctx->type = -1; + } + + ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16); + + /* Skip leading spaces for the reason string. */ + if (apr_isspace(*reason)) { + reason++; + } + + ctx->length = apr_strtoi64(reason, &reason, 16); + + /* Skip leading spaces for the reason string. */ + if (reason - ctx->linebuf.line < ctx->linebuf.used) { + if (apr_isspace(*reason)) { + reason++; + } + + ctx->phrase = serf_bstrmemdup(allocator, reason, + ctx->linebuf.used + - (reason - ctx->linebuf.line)); + } else { + ctx->phrase = NULL; + } + + return APR_SUCCESS; +} + +/* This code should be replaced with header buckets. */ +static apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx) +{ + apr_status_t status; + + /* RFC 2616 says that CRLF is the only line ending, but we can easily + * accept any kind of line ending. + */ + status = fetch_line(ctx, SERF_NEWLINE_ANY); + if (SERF_BUCKET_READ_ERROR(status)) { + return status; + } + /* Something was read. Process it. */ + + if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) { + const char *end_key; + const char *c; + + end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used); + if (!c) { + /* Bad headers? */ + return APR_EGENERAL; + } + + /* Skip over initial : and spaces. */ + while (apr_isspace(*++c)) + continue; + + /* Always copy the headers (from the linebuf into new mem). */ + /* ### we should be able to optimize some mem copies */ + serf_bucket_headers_setx( + ctx->headers, + ctx->linebuf.line, end_key - ctx->linebuf.line, 1, + c, ctx->linebuf.line + ctx->linebuf.used - c, 1); + } + + return status; +} + +/* Perform one iteration of the state machine. + * + * Will return when one the following conditions occurred: + * 1) a state change + * 2) an error + * 3) the stream is not ready or at EOF + * 4) APR_SUCCESS, meaning the machine can be run again immediately + */ +static apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx) +{ + apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */ + + switch (ctx->state) { + case STATE_STATUS_LINE: + /* RFC 2616 says that CRLF is the only line ending, but we can easily + * accept any kind of line ending. + */ + status = fetch_line(ctx, SERF_NEWLINE_ANY); + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) { + /* The Status-Line is in the line buffer. Process it. */ + status = parse_status_line(ctx, bkt->allocator); + if (status) + return status; + + if (ctx->length) { + ctx->body = + serf_bucket_barrier_create(ctx->stream, bkt->allocator); + ctx->body = serf_bucket_limit_create(ctx->body, ctx->length, + bkt->allocator); + if (!ctx->type) { + ctx->state = STATE_HEADERS; + } else { + ctx->state = STATE_BODY; + } + } else { + ctx->state = STATE_DONE; + } + } + else { + /* The connection closed before we could get the next + * response. Treat the request as lost so that our upper + * end knows the server never tried to give us a response. + */ + if (APR_STATUS_IS_EOF(status)) { + return SERF_ERROR_REQUEST_LOST; + } + } + break; + case STATE_HEADERS: + status = fetch_headers(ctx->body, ctx); + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + /* If an empty line was read, then we hit the end of the headers. + * Move on to the body. + */ + if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) { + /* Advance the state. */ + ctx->state = STATE_DONE; + } + break; + case STATE_BODY: + /* Don't do anything. */ + break; + case STATE_DONE: + return APR_EOF; + default: + /* Not reachable */ + return APR_EGENERAL; + } + + return status; +} + +static apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx) +{ + apr_status_t status; + + /* Keep reading and moving through states if we aren't at the BODY */ + while (ctx->state != STATE_BODY) { + status = run_machine(bkt, ctx); + + /* Anything other than APR_SUCCESS means that we cannot immediately + * read again (for now). + */ + if (status) + return status; + } + /* in STATE_BODY */ + + return APR_SUCCESS; +} + +apr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers( + serf_bucket_t *bucket) +{ + incoming_context_t *ctx = bucket->data; + + return wait_for_body(bucket, ctx); +} + +static apr_status_t bwtp_incoming_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + incoming_context_t *ctx = bucket->data; + apr_status_t rv; + + rv = wait_for_body(bucket, ctx); + if (rv) { + /* It's not possible to have read anything yet! */ + if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) { + *len = 0; + } + return rv; + } + + rv = serf_bucket_read(ctx->body, requested, data, len); + if (APR_STATUS_IS_EOF(rv)) { + ctx->state = STATE_DONE; + } + return rv; +} + +static apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + incoming_context_t *ctx = bucket->data; + apr_status_t rv; + + rv = wait_for_body(bucket, ctx); + if (rv) { + return rv; + } + + /* Delegate to the stream bucket to do the readline. */ + return serf_bucket_readline(ctx->body, acceptable, found, data, len); +} + +/* ### need to implement */ +#define bwtp_incoming_peek NULL + +const serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = { + "BWTP-INCOMING", + bwtp_incoming_read, + bwtp_incoming_readline, + serf_default_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + bwtp_incoming_peek, + bwtp_incoming_destroy_and_data, +}; |