diff options
Diffstat (limited to 'outgoing.c')
-rw-r--r-- | outgoing.c | 1431 |
1 files changed, 1431 insertions, 0 deletions
diff --git a/outgoing.c b/outgoing.c new file mode 100644 index 0000000..53fac0a --- /dev/null +++ b/outgoing.c @@ -0,0 +1,1431 @@ +/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <apr_pools.h> +#include <apr_poll.h> +#include <apr_version.h> + +#include "serf.h" +#include "serf_bucket_util.h" + +#include "serf_private.h" + +/* cleanup for sockets */ +static apr_status_t clean_skt(void *data) +{ + serf_connection_t *conn = data; + apr_status_t status = APR_SUCCESS; + + if (conn->skt) { + status = apr_socket_close(conn->skt); + conn->skt = NULL; + } + + return status; +} + +static apr_status_t clean_resp(void *data) +{ + serf_request_t *request = data; + + /* The request's RESPOOL is being cleared. */ + + /* If the response has allocated some buckets, then destroy them (since + the bucket may hold resources other than memory in RESPOOL). Also + make sure to set their fields to NULL so connection closure does + not attempt to free them again. */ + if (request->resp_bkt) { + serf_bucket_destroy(request->resp_bkt); + request->resp_bkt = NULL; + } + if (request->req_bkt) { + serf_bucket_destroy(request->req_bkt); + request->req_bkt = NULL; + } + + /* ### should we worry about debug stuff, like that performed in + ### destroy_request()? should we worry about calling req->handler + ### to notify this "cancellation" due to pool clearing? */ + + /* This pool just got cleared/destroyed. Don't try to destroy the pool + (again) when the request is canceled. */ + request->respool = NULL; + + return APR_SUCCESS; +} + +/* cleanup for conns */ +static apr_status_t clean_conn(void *data) +{ + serf_connection_t *conn = data; + + serf_connection_close(conn); + + return APR_SUCCESS; +} + +/* Update the pollset for this connection. We tweak the pollset based on + * whether we want to read and/or write, given conditions within the + * connection. If the connection is not (yet) in the pollset, then it + * will be added. + */ +apr_status_t serf__conn_update_pollset(serf_connection_t *conn) +{ + serf_context_t *ctx = conn->ctx; + apr_status_t status; + apr_pollfd_t desc = { 0 }; + + if (!conn->skt) { + return APR_SUCCESS; + } + + /* Remove the socket from the poll set. */ + desc.desc_type = APR_POLL_SOCKET; + desc.desc.s = conn->skt; + desc.reqevents = conn->reqevents; + + status = ctx->pollset_rm(ctx->pollset_baton, + &desc, conn); + if (status && !APR_STATUS_IS_NOTFOUND(status)) + return status; + + /* Now put it back in with the correct read/write values. */ + desc.reqevents = APR_POLLHUP | APR_POLLERR; + if (conn->requests) { + /* If there are any outstanding events, then we want to read. */ + /* ### not true. we only want to read IF we have sent some data */ + desc.reqevents |= APR_POLLIN; + + /* If the connection has unwritten data, or there are any requests + * that still have buckets to write out, then we want to write. + */ + if (conn->vec_len) + desc.reqevents |= APR_POLLOUT; + else { + serf_request_t *request = conn->requests; + + if ((conn->probable_keepalive_limit && + conn->completed_requests > conn->probable_keepalive_limit) || + (conn->max_outstanding_requests && + conn->completed_requests - conn->completed_responses >= + conn->max_outstanding_requests)) { + /* we wouldn't try to write any way right now. */ + } + else { + while (request != NULL && request->req_bkt == NULL && + request->written) + request = request->next; + if (request != NULL) + desc.reqevents |= APR_POLLOUT; + } + } + } + + /* If we can have async responses, always look for something to read. */ + if (conn->async_responses) { + desc.reqevents |= APR_POLLIN; + } + + /* save our reqevents, so we can pass it in to remove later. */ + conn->reqevents = desc.reqevents; + + /* Note: even if we don't want to read/write this socket, we still + * want to poll it for hangups and errors. + */ + return ctx->pollset_add(ctx->pollset_baton, + &desc, &conn->baton); +} + +#ifdef SERF_DEBUG_BUCKET_USE + +/* Make sure all response buckets were drained. */ +static void check_buckets_drained(serf_connection_t *conn) +{ + serf_request_t *request = conn->requests; + + for ( ; request ; request = request->next ) { + if (request->resp_bkt != NULL) { + /* ### crap. can't do this. this allocator may have un-drained + * ### REQUEST buckets. + */ + /* serf_debug__entered_loop(request->resp_bkt->allocator); */ + /* ### for now, pretend we closed the conn (resets the tracking) */ + serf_debug__closed_conn(request->resp_bkt->allocator); + } + } +} + +#endif + +/* Create and connect sockets for any connections which don't have them + * yet. This is the core of our lazy-connect behavior. + */ +apr_status_t serf__open_connections(serf_context_t *ctx) +{ + int i; + + for (i = ctx->conns->nelts; i--; ) { + serf_connection_t *conn = GET_CONN(ctx, i); + apr_status_t status; + apr_socket_t *skt; + apr_sockaddr_t *serv_addr; + + conn->seen_in_pollset = 0; + + if (conn->skt != NULL) { +#ifdef SERF_DEBUG_BUCKET_USE + check_buckets_drained(conn); +#endif + continue; + } + + /* Delay opening until we have something to deliver! */ + if (conn->requests == NULL) { + continue; + } + + apr_pool_clear(conn->skt_pool); + apr_pool_cleanup_register(conn->skt_pool, conn, clean_skt, clean_skt); + + /* Do we have to connect to a proxy server? */ + if (ctx->proxy_address) + serv_addr = ctx->proxy_address; + else + serv_addr = conn->address; + + if ((status = apr_socket_create(&skt, serv_addr->family, + SOCK_STREAM, +#if APR_MAJOR_VERSION > 0 + APR_PROTO_TCP, +#endif + conn->skt_pool)) != APR_SUCCESS) + return status; + + /* Set the socket to be non-blocking */ + if ((status = apr_socket_timeout_set(skt, 0)) != APR_SUCCESS) + return status; + + /* Disable Nagle's algorithm */ + if ((status = apr_socket_opt_set(skt, + APR_TCP_NODELAY, 1)) != APR_SUCCESS) + return status; + + /* Configured. Store it into the connection now. */ + conn->skt = skt; + + /* Now that the socket is set up, let's connect it. This should + * return immediately. + */ + if ((status = apr_socket_connect(skt, + serv_addr)) != APR_SUCCESS) { + if (!APR_STATUS_IS_EINPROGRESS(status)) + return status; + } + + /* Flag our pollset as dirty now that we have a new socket. */ + conn->dirty_conn = 1; + ctx->dirty_pollset = 1; + + /* If the authentication was already started on another connection, + prepare this connection (it might be possible to skip some + part of the handshaking). */ + if (ctx->proxy_address) { + if (conn->ctx->proxy_authn_info.scheme) + conn->ctx->proxy_authn_info.scheme->init_conn_func(407, conn, + conn->pool); + } + + if (conn->ctx->authn_info.scheme) + conn->ctx->authn_info.scheme->init_conn_func(401, conn, + conn->pool); + + /* Does this connection require a SSL tunnel over the proxy? */ + if (ctx->proxy_address && strcmp(conn->host_info.scheme, "https") == 0) + serf__ssltunnel_connect(conn); + else + conn->state = SERF_CONN_CONNECTED; + } + + return APR_SUCCESS; +} + +static apr_status_t no_more_writes(serf_connection_t *conn, + serf_request_t *request) +{ + /* Note that we should hold new requests until we open our new socket. */ + conn->state = SERF_CONN_CLOSING; + + /* We can take the *next* request in our list and assume it hasn't + * been written yet and 'save' it for the new socket. + */ + conn->hold_requests = request->next; + conn->hold_requests_tail = conn->requests_tail; + request->next = NULL; + conn->requests_tail = request; + + /* Clear our iovec. */ + conn->vec_len = 0; + + /* Update the pollset to know we don't want to write on this socket any + * more. + */ + conn->dirty_conn = 1; + conn->ctx->dirty_pollset = 1; + return APR_SUCCESS; +} + +/* Read the 'Connection' header from the response. Return SERF_ERROR_CLOSING if + * the header contains value 'close' indicating the server is closing the + * connection right after this response. + * Otherwise returns APR_SUCCESS. + */ +static apr_status_t is_conn_closing(serf_bucket_t *response) +{ + serf_bucket_t *hdrs; + const char *val; + + hdrs = serf_bucket_response_get_headers(response); + val = serf_bucket_headers_get(hdrs, "Connection"); + if (val && strcasecmp("close", val) == 0) + { + return SERF_ERROR_CLOSING; + } + + return APR_SUCCESS; +} + +static void link_requests(serf_request_t **list, serf_request_t **tail, + serf_request_t *request) +{ + if (*list == NULL) { + *list = request; + *tail = request; + } + else { + (*tail)->next = request; + *tail = request; + } +} + +static apr_status_t destroy_request(serf_request_t *request) +{ + serf_connection_t *conn = request->conn; + + /* The request and response buckets are no longer needed, + nor is the request's pool. */ + if (request->resp_bkt) { + serf_debug__closed_conn(request->resp_bkt->allocator); + serf_bucket_destroy(request->resp_bkt); + request->resp_bkt = NULL; + } + if (request->req_bkt) { + serf_debug__closed_conn(request->req_bkt->allocator); + serf_bucket_destroy(request->req_bkt); + request->req_bkt = NULL; + } + + serf_debug__bucket_alloc_check(request->allocator); + if (request->respool) { + /* ### unregister the pool cleanup for self? */ + apr_pool_destroy(request->respool); + } + + serf_bucket_mem_free(conn->allocator, request); + + return APR_SUCCESS; +} + +static apr_status_t cancel_request(serf_request_t *request, + serf_request_t **list, + int notify_request) +{ + /* If we haven't run setup, then we won't have a handler to call. */ + if (request->handler && notify_request) { + /* We actually don't care what the handler returns. + * We have bigger matters at hand. + */ + (*request->handler)(request, NULL, request->handler_baton, + request->respool); + } + + if (*list == request) { + *list = request->next; + } + else { + serf_request_t *scan = *list; + + while (scan->next && scan->next != request) + scan = scan->next; + + if (scan->next) { + scan->next = scan->next->next; + } + } + + return destroy_request(request); +} + +static apr_status_t remove_connection(serf_context_t *ctx, + serf_connection_t *conn) +{ + apr_pollfd_t desc = { 0 }; + + desc.desc_type = APR_POLL_SOCKET; + desc.desc.s = conn->skt; + desc.reqevents = conn->reqevents; + + return ctx->pollset_rm(ctx->pollset_baton, + &desc, conn); +} + +static void destroy_ostream(serf_connection_t *conn) +{ + if (conn->ostream_head != NULL) { + serf_bucket_destroy(conn->ostream_head); + conn->ostream_head = NULL; + conn->ostream_tail = NULL; + } +} + +/* A socket was closed, inform the application. */ +static void handle_conn_closed(serf_connection_t *conn, apr_status_t status) +{ + (*conn->closed)(conn, conn->closed_baton, status, + conn->pool); +} + +static apr_status_t reset_connection(serf_connection_t *conn, + int requeue_requests) +{ + serf_context_t *ctx = conn->ctx; + apr_status_t status; + serf_request_t *old_reqs, *held_reqs, *held_reqs_tail; + + conn->probable_keepalive_limit = conn->completed_responses; + conn->completed_requests = 0; + conn->completed_responses = 0; + + old_reqs = conn->requests; + held_reqs = conn->hold_requests; + held_reqs_tail = conn->hold_requests_tail; + + if (conn->state == SERF_CONN_CLOSING) { + conn->hold_requests = NULL; + conn->hold_requests_tail = NULL; + } + + conn->requests = NULL; + conn->requests_tail = NULL; + + while (old_reqs) { + /* If we haven't started to write the connection, bring it over + * unchanged to our new socket. Otherwise, call the cancel function. + */ + if (requeue_requests && !old_reqs->written) { + serf_request_t *req = old_reqs; + old_reqs = old_reqs->next; + req->next = NULL; + link_requests(&conn->requests, &conn->requests_tail, req); + } + else { + cancel_request(old_reqs, &old_reqs, requeue_requests); + } + } + + if (conn->requests_tail) { + conn->requests_tail->next = held_reqs; + } + else { + conn->requests = held_reqs; + } + if (held_reqs_tail) { + conn->requests_tail = held_reqs_tail; + } + + if (conn->skt != NULL) { + remove_connection(ctx, conn); + status = apr_socket_close(conn->skt); + if (conn->closed != NULL) { + handle_conn_closed(conn, status); + } + conn->skt = NULL; + } + + if (conn->stream != NULL) { + serf_bucket_destroy(conn->stream); + conn->stream = NULL; + } + + destroy_ostream(conn); + + /* Don't try to resume any writes */ + conn->vec_len = 0; + + conn->dirty_conn = 1; + conn->ctx->dirty_pollset = 1; + conn->state = SERF_CONN_INIT; + + conn->status = APR_SUCCESS; + + /* Let our context know that we've 'reset' the socket already. */ + conn->seen_in_pollset |= APR_POLLHUP; + + /* Found the connection. Closed it. All done. */ + return APR_SUCCESS; +} + +static apr_status_t socket_writev(serf_connection_t *conn) +{ + apr_size_t written; + apr_status_t status; + + status = apr_socket_sendv(conn->skt, conn->vec, + conn->vec_len, &written); + + /* did we write everything? */ + if (written) { + apr_size_t len = 0; + int i; + + for (i = 0; i < conn->vec_len; i++) { + len += conn->vec[i].iov_len; + if (written < len) { + if (i) { + memmove(conn->vec, &conn->vec[i], + sizeof(struct iovec) * (conn->vec_len - i)); + conn->vec_len -= i; + } + conn->vec[0].iov_base = (char *)conn->vec[0].iov_base + (conn->vec[0].iov_len - (len - written)); + conn->vec[0].iov_len = len - written; + break; + } + } + if (len == written) { + conn->vec_len = 0; + } + + /* Log progress information */ + serf__context_progress_delta(conn->ctx, 0, written); + } + + return status; +} + +static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket) +{ + serf_connection_t *conn = baton; + conn->hit_eof = 1; + return APR_EAGAIN; +} + +static apr_status_t do_conn_setup(serf_connection_t *conn) +{ + apr_status_t status; + serf_bucket_t *ostream; + + if (conn->ostream_head == NULL) { + conn->ostream_head = serf_bucket_aggregate_create(conn->allocator); + } + + if (conn->ostream_tail == NULL) { + conn->ostream_tail = serf__bucket_stream_create(conn->allocator, + detect_eof, + conn); + } + + ostream = conn->ostream_tail; + + status = (*conn->setup)(conn->skt, + &conn->stream, + &ostream, + conn->setup_baton, + conn->pool); + if (status) { + /* extra destroy here since it wasn't added to the head bucket yet. */ + serf_bucket_destroy(conn->ostream_tail); + destroy_ostream(conn); + return status; + } + + serf_bucket_aggregate_append(conn->ostream_head, + ostream); + + return status; +} + +/* Set up the input and output stream buckets. + When a tunnel over an http proxy is needed, create a socket bucket and + empty aggregate bucket for sending and receiving unencrypted requests + over the socket. + + After the tunnel is there, or no tunnel was needed, ask the application + to create the input and output buckets, which should take care of the + [en/de]cryption. +*/ + +static apr_status_t prepare_conn_streams(serf_connection_t *conn, + serf_bucket_t **istream, + serf_bucket_t **ostreamt, + serf_bucket_t **ostreamh) +{ + apr_status_t status; + + /* Do we need a SSL tunnel first? */ + if (conn->state == SERF_CONN_CONNECTED) { + /* If the connection does not have an associated bucket, then + * call the setup callback to get one. + */ + if (conn->stream == NULL) { + status = do_conn_setup(conn); + if (status) { + return status; + } + } + *ostreamt = conn->ostream_tail; + *ostreamh = conn->ostream_head; + *istream = conn->stream; + } else { + /* SSL tunnel needed and not set up yet, get a direct unencrypted + stream for this socket */ + if (conn->stream == NULL) { + *istream = serf_bucket_socket_create(conn->skt, + conn->allocator); + } + /* Don't create the ostream bucket chain including the ssl_encrypt + bucket yet. This ensure the CONNECT request is sent unencrypted + to the proxy. */ + *ostreamt = *ostreamh = conn->ssltunnel_ostream; + } + + return APR_SUCCESS; +} + +/* write data out to the connection */ +static apr_status_t write_to_connection(serf_connection_t *conn) +{ + serf_request_t *request = conn->requests; + + if (conn->probable_keepalive_limit && + conn->completed_requests > conn->probable_keepalive_limit) { + + conn->dirty_conn = 1; + conn->ctx->dirty_pollset = 1; + + /* backoff for now. */ + return APR_SUCCESS; + } + + /* Find a request that has data which needs to be delivered. */ + while (request != NULL && + request->req_bkt == NULL && request->written) + request = request->next; + + /* assert: request != NULL || conn->vec_len */ + + /* Keep reading and sending until we run out of stuff to read, or + * writing would block. + */ + while (1) { + int stop_reading = 0; + apr_status_t status; + apr_status_t read_status; + serf_bucket_t *ostreamt, *ostreamh; + int max_outstanding_requests = conn->max_outstanding_requests; + + /* If we're setting up an ssl tunnel, we can't send real requests + at yet, as they need to be encrypted and our encrypt buckets + aren't created yet as we still need to read the unencrypted + response of the CONNECT request. */ + if (conn->state != SERF_CONN_CONNECTED) + max_outstanding_requests = 1; + + if (max_outstanding_requests && + conn->completed_requests - + conn->completed_responses >= max_outstanding_requests) { + /* backoff for now. */ + return APR_SUCCESS; + } + + /* If we have unwritten data, then write what we can. */ + while (conn->vec_len) { + status = socket_writev(conn); + + /* If the write would have blocked, then we're done. Don't try + * to write anything else to the socket. + */ + if (APR_STATUS_IS_EAGAIN(status)) + return APR_SUCCESS; + if (APR_STATUS_IS_EPIPE(status)) + return no_more_writes(conn, request); + if (status) + return status; + } + /* ### can we have a short write, yet no EAGAIN? a short write + ### would imply unwritten_len > 0 ... */ + /* assert: unwritten_len == 0. */ + + /* We may need to move forward to a request which has something + * to write. + */ + while (request != NULL && + request->req_bkt == NULL && request->written) + request = request->next; + + if (request == NULL) { + /* No more requests (with data) are registered with the + * connection. Let's update the pollset so that we don't + * try to write to this socket again. + */ + conn->dirty_conn = 1; + conn->ctx->dirty_pollset = 1; + return APR_SUCCESS; + } + + status = prepare_conn_streams(conn, &conn->stream, &ostreamt, &ostreamh); + if (status) { + return status; + } + + if (request->req_bkt == NULL) { + /* Now that we are about to serve the request, allocate a pool. */ + apr_pool_create(&request->respool, conn->pool); + request->allocator = serf_bucket_allocator_create(request->respool, + NULL, NULL); + apr_pool_cleanup_register(request->respool, request, + clean_resp, clean_resp); + + /* Fill in the rest of the values for the request. */ + read_status = request->setup(request, request->setup_baton, + &request->req_bkt, + &request->acceptor, + &request->acceptor_baton, + &request->handler, + &request->handler_baton, + request->respool); + + if (read_status) { + /* Something bad happened. Propagate any errors. */ + return read_status; + } + + request->written = 1; + serf_bucket_aggregate_append(ostreamt, request->req_bkt); + } + + /* ### optimize at some point by using read_for_sendfile */ + read_status = serf_bucket_read_iovec(ostreamh, + SERF_READ_ALL_AVAIL, + IOV_MAX, + conn->vec, + &conn->vec_len); + + if (!conn->hit_eof) { + if (APR_STATUS_IS_EAGAIN(read_status) || + read_status == SERF_ERROR_WAIT_CONN) { + /* We read some stuff, but should not try to read again. */ + stop_reading = 1; + + /* ### we should avoid looking for writability for a while so + ### that (hopefully) something will appear in the bucket so + ### we can actually write something. otherwise, we could + ### end up in a CPU spin: socket wants something, but we + ### don't have anything (and keep returning EAGAIN) + */ + } + else if (read_status && !APR_STATUS_IS_EOF(read_status)) { + /* Something bad happened. Propagate any errors. */ + return read_status; + } + } + + /* If we got some data, then deliver it. */ + /* ### what to do if we got no data?? is that a problem? */ + if (conn->vec_len > 0) { + status = socket_writev(conn); + + /* If we can't write any more, or an error occurred, then + * we're done here. + */ + if (APR_STATUS_IS_EAGAIN(status)) + return APR_SUCCESS; + if (APR_STATUS_IS_EPIPE(status)) + return no_more_writes(conn, request); + if (APR_STATUS_IS_ECONNRESET(status)) { + return no_more_writes(conn, request); + } + if (status) + return status; + } + + if (read_status == SERF_ERROR_WAIT_CONN) { + stop_reading = 1; + } + else if (read_status && conn->hit_eof && conn->vec_len == 0) { + /* If we hit the end of the request bucket and all of its data has + * been written, then clear it out to signify that we're done + * sending the request. On the next iteration through this loop: + * - if there are remaining bytes they will be written, and as the + * request bucket will be completely read it will be destroyed then. + * - we'll see if there are other requests that need to be sent + * ("pipelining"). + */ + conn->hit_eof = 0; + serf_bucket_destroy(request->req_bkt); + request->req_bkt = NULL; + + /* If our connection has async responses enabled, we're not + * going to get a reply back, so kill the request. + */ + if (conn->async_responses) { + conn->requests = request->next; + destroy_request(request); + } + + conn->completed_requests++; + + if (conn->probable_keepalive_limit && + conn->completed_requests > conn->probable_keepalive_limit) { + /* backoff for now. */ + stop_reading = 1; + } + } + + if (stop_reading) { + return APR_SUCCESS; + } + } + /* NOTREACHED */ +} + +/* A response message was received from the server, so call + the handler as specified on the original request. */ +static apr_status_t handle_response(serf_request_t *request, + apr_pool_t *pool) +{ + apr_status_t status = APR_SUCCESS; + int consumed_response = 0; + + /* Only enable the new authentication framework if the program has + * registered an authentication credential callback. + * + * This permits older Serf apps to still handle authentication + * themselves by not registering credential callbacks. + */ + if (request->conn->ctx->cred_cb) { + status = serf__handle_auth_response(&consumed_response, + request, + request->resp_bkt, + request->handler_baton, + pool); + + /* If there was an error reading the response (maybe there wasn't + enough data available), don't bother passing the response to the + application. + + If the authentication was tried, but failed, pass the response + to the application, maybe it can do better. */ + if (APR_STATUS_IS_EOF(status) || + APR_STATUS_IS_EAGAIN(status)) { + return status; + } + } + + if (!consumed_response) { + return (*request->handler)(request, + request->resp_bkt, + request->handler_baton, + pool); + } + + return status; +} + +/* An async response message was received from the server. */ +static apr_status_t handle_async_response(serf_connection_t *conn, + apr_pool_t *pool) +{ + apr_status_t status; + + if (conn->current_async_response == NULL) { + conn->current_async_response = + (*conn->async_acceptor)(NULL, conn->stream, + conn->async_acceptor_baton, pool); + } + + status = (*conn->async_handler)(NULL, conn->current_async_response, + conn->async_handler_baton, pool); + + if (APR_STATUS_IS_EOF(status)) { + serf_bucket_destroy(conn->current_async_response); + conn->current_async_response = NULL; + status = APR_SUCCESS; + } + + return status; +} + +/* read data from the connection */ +static apr_status_t read_from_connection(serf_connection_t *conn) +{ + apr_status_t status; + apr_pool_t *tmppool; + int close_connection = FALSE; + + /* Whatever is coming in on the socket corresponds to the first request + * on our chain. + */ + serf_request_t *request = conn->requests; + + /* assert: request != NULL */ + + if ((status = apr_pool_create(&tmppool, conn->pool)) != APR_SUCCESS) + goto error; + + /* Invoke response handlers until we have no more work. */ + while (1) { + serf_bucket_t *dummy1, *dummy2; + + apr_pool_clear(tmppool); + + /* Only interested in the input stream here. */ + status = prepare_conn_streams(conn, &conn->stream, &dummy1, &dummy2); + if (status) { + goto error; + } + + /* We have a different codepath when we can have async responses. */ + if (conn->async_responses) { + /* TODO What about socket errors? */ + status = handle_async_response(conn, tmppool); + if (APR_STATUS_IS_EAGAIN(status)) { + status = APR_SUCCESS; + goto error; + } + if (status) { + goto error; + } + continue; + } + + /* We are reading a response for a request we haven't + * written yet! + * + * This shouldn't normally happen EXCEPT: + * + * 1) when the other end has closed the socket and we're + * pending an EOF return. + * 2) Doing the initial SSL handshake - we'll get EAGAIN + * as the SSL buckets will hide the handshake from us + * but not return any data. + * + * In these cases, we should not receive any actual user data. + * + * If we see an EOF (due to an expired timeout), we'll reset the + * connection and open a new one. + */ + if (request->req_bkt || !request->written) { + const char *data; + apr_size_t len; + + status = serf_bucket_read(conn->stream, SERF_READ_ALL_AVAIL, + &data, &len); + + if (!status && len) { + status = APR_EGENERAL; + } + else if (APR_STATUS_IS_EOF(status)) { + reset_connection(conn, 1); + status = APR_SUCCESS; + } + else if (APR_STATUS_IS_EAGAIN(status)) { + status = APR_SUCCESS; + } + + goto error; + } + + /* If the request doesn't have a response bucket, then call the + * acceptor to get one created. + */ + if (request->resp_bkt == NULL) { + request->resp_bkt = (*request->acceptor)(request, conn->stream, + request->acceptor_baton, + tmppool); + apr_pool_clear(tmppool); + } + + status = handle_response(request, tmppool); + + /* Some systems will not generate a HUP poll event so we have to + * handle the ECONNRESET issue here. + */ + if (APR_STATUS_IS_ECONNRESET(status) || + status == SERF_ERROR_REQUEST_LOST) { + reset_connection(conn, 1); + status = APR_SUCCESS; + goto error; + } + + /* If our response handler says it can't do anything more, we now + * treat that as a success. + */ + if (APR_STATUS_IS_EAGAIN(status)) { + status = APR_SUCCESS; + goto error; + } + + /* If we received APR_SUCCESS, run this loop again. */ + if (!status) { + continue; + } + + close_connection = is_conn_closing(request->resp_bkt); + + if (!APR_STATUS_IS_EOF(status) && + close_connection != SERF_ERROR_CLOSING) { + /* Whether success, or an error, there is no more to do unless + * this request has been completed. + */ + goto error; + } + + /* The request has been fully-delivered, and the response has + * been fully-read. Remove it from our queue and loop to read + * another response. + */ + conn->requests = request->next; + + destroy_request(request); + + request = conn->requests; + + /* If we're truly empty, update our tail. */ + if (request == NULL) { + conn->requests_tail = NULL; + } + + conn->completed_responses++; + + /* We've to rebuild pollset since completed_responses is changed. */ + conn->dirty_conn = 1; + conn->ctx->dirty_pollset = 1; + + /* This means that we're being advised that the connection is done. */ + if (close_connection == SERF_ERROR_CLOSING) { + reset_connection(conn, 1); + if (APR_STATUS_IS_EOF(status)) + status = APR_SUCCESS; + goto error; + } + + /* The server is suddenly deciding to serve more responses than we've + * seen before. + * + * Let our requests go. + */ + if (conn->probable_keepalive_limit && + conn->completed_responses > conn->probable_keepalive_limit) { + conn->probable_keepalive_limit = 0; + } + + /* If we just ran out of requests or have unwritten requests, then + * update the pollset. We don't want to read from this socket any + * more. We are definitely done with this loop, too. + */ + if (request == NULL || !request->written) { + conn->dirty_conn = 1; + conn->ctx->dirty_pollset = 1; + status = APR_SUCCESS; + goto error; + } + } + +error: + apr_pool_destroy(tmppool); + return status; +} + +/* process all events on the connection */ +apr_status_t serf__process_connection(serf_connection_t *conn, + apr_int16_t events) +{ + apr_status_t status; + + /* POLLHUP/ERR should come after POLLIN so if there's an error message or + * the like sitting on the connection, we give the app a chance to read + * it before we trigger a reset condition. + */ + if ((events & APR_POLLIN) != 0) { + if ((status = read_from_connection(conn)) != APR_SUCCESS) + return status; + + /* If we decided to reset our connection, return now as we don't + * want to write. + */ + if ((conn->seen_in_pollset & APR_POLLHUP) != 0) { + return APR_SUCCESS; + } + } + if ((events & APR_POLLHUP) != 0) { + /* The connection got reset by the server. On Windows this can happen + when all data is read, so just cleanup the connection and open + a new one. */ + return reset_connection(conn, 1); + } + if ((events & APR_POLLERR) != 0) { + /* We might be talking to a buggy HTTP server that doesn't + * do lingering-close. (httpd < 2.1.8 does this.) + * + * See: + * + * http://issues.apache.org/bugzilla/show_bug.cgi?id=35292 + */ + if (conn->completed_requests && !conn->probable_keepalive_limit) { + return reset_connection(conn, 1); + } + return APR_EGENERAL; + } + if ((events & APR_POLLOUT) != 0) { + if ((status = write_to_connection(conn)) != APR_SUCCESS) + return status; + } + return APR_SUCCESS; +} + +serf_connection_t *serf_connection_create( + serf_context_t *ctx, + apr_sockaddr_t *address, + serf_connection_setup_t setup, + void *setup_baton, + serf_connection_closed_t closed, + void *closed_baton, + apr_pool_t *pool) +{ + serf_connection_t *conn = apr_pcalloc(pool, sizeof(*conn)); + + conn->ctx = ctx; + conn->status = APR_SUCCESS; + conn->address = address; + conn->setup = setup; + conn->setup_baton = setup_baton; + conn->closed = closed; + conn->closed_baton = closed_baton; + conn->pool = pool; + conn->allocator = serf_bucket_allocator_create(pool, NULL, NULL); + conn->stream = NULL; + conn->ostream_head = NULL; + conn->ostream_tail = NULL; + conn->baton.type = SERF_IO_CONN; + conn->baton.u.conn = conn; + conn->hit_eof = 0; + conn->state = SERF_CONN_INIT; + + /* Create a subpool for our connection. */ + apr_pool_create(&conn->skt_pool, conn->pool); + + /* register a cleanup */ + apr_pool_cleanup_register(conn->pool, conn, clean_conn, apr_pool_cleanup_null); + + /* Add the connection to the context. */ + *(serf_connection_t **)apr_array_push(ctx->conns) = conn; + + return conn; +} + +apr_status_t serf_connection_create2( + serf_connection_t **conn, + serf_context_t *ctx, + apr_uri_t host_info, + serf_connection_setup_t setup, + void *setup_baton, + serf_connection_closed_t closed, + void *closed_baton, + apr_pool_t *pool) +{ + apr_status_t status; + serf_connection_t *c; + apr_sockaddr_t *host_address; + + /* Parse the url, store the address of the server. */ + status = apr_sockaddr_info_get(&host_address, + host_info.hostname, + APR_UNSPEC, host_info.port, 0, pool); + if (status) + return status; + + c = serf_connection_create(ctx, host_address, setup, setup_baton, + closed, closed_baton, pool); + + /* We're not interested in the path following the hostname. */ + c->host_url = apr_uri_unparse(c->pool, + &host_info, + APR_URI_UNP_OMITPATHINFO); + c->host_info = host_info; + + *conn = c; + + return status; +} + +apr_status_t serf_connection_reset( + serf_connection_t *conn) +{ + return reset_connection(conn, 0); +} + + +apr_status_t serf_connection_close( + serf_connection_t *conn) +{ + int i; + serf_context_t *ctx = conn->ctx; + apr_status_t status; + + for (i = ctx->conns->nelts; i--; ) { + serf_connection_t *conn_seq = GET_CONN(ctx, i); + + if (conn_seq == conn) { + while (conn->requests) { + serf_request_cancel(conn->requests); + } + if (conn->skt != NULL) { + remove_connection(ctx, conn); + status = apr_socket_close(conn->skt); + if (conn->closed != NULL) { + handle_conn_closed(conn, status); + } + conn->skt = NULL; + } + if (conn->stream != NULL) { + serf_bucket_destroy(conn->stream); + conn->stream = NULL; + } + + /* Remove the connection from the context. We don't want to + * deal with it any more. + */ + if (i < ctx->conns->nelts - 1) { + /* move later connections over this one. */ + memmove( + &GET_CONN(ctx, i), + &GET_CONN(ctx, i + 1), + (ctx->conns->nelts - i - 1) * sizeof(serf_connection_t *)); + } + --ctx->conns->nelts; + + /* Found the connection. Closed it. All done. */ + return APR_SUCCESS; + } + } + + /* We didn't find the specified connection. */ + /* ### doc talks about this w.r.t poll structures. use something else? */ + return APR_NOTFOUND; +} + + +void serf_connection_set_max_outstanding_requests( + serf_connection_t *conn, + unsigned int max_requests) +{ + conn->max_outstanding_requests = max_requests; +} + + +void serf_connection_set_async_responses( + serf_connection_t *conn, + serf_response_acceptor_t acceptor, + void *acceptor_baton, + serf_response_handler_t handler, + void *handler_baton) +{ + conn->async_responses = 1; + conn->async_acceptor = acceptor; + conn->async_acceptor_baton = acceptor_baton; + conn->async_handler = handler; + conn->async_handler_baton = handler_baton; +} + + +serf_request_t *serf_connection_request_create( + serf_connection_t *conn, + serf_request_setup_t setup, + void *setup_baton) +{ + serf_request_t *request; + + request = serf_bucket_mem_alloc(conn->allocator, sizeof(*request)); + request->conn = conn; + request->setup = setup; + request->setup_baton = setup_baton; + request->handler = NULL; + request->respool = NULL; + request->req_bkt = NULL; + request->resp_bkt = NULL; + request->priority = 0; + request->written = 0; + request->next = NULL; + + /* Link the request to the end of the request chain. */ + if (conn->state == SERF_CONN_CLOSING) { + link_requests(&conn->hold_requests, &conn->hold_requests_tail, request); + } + else { + link_requests(&conn->requests, &conn->requests_tail, request); + + /* Ensure our pollset becomes writable in context run */ + conn->ctx->dirty_pollset = 1; + conn->dirty_conn = 1; + } + + return request; +} + + +serf_request_t *serf_connection_priority_request_create( + serf_connection_t *conn, + serf_request_setup_t setup, + void *setup_baton) +{ + serf_request_t *request; + serf_request_t *iter, *prev; + + request = serf_bucket_mem_alloc(conn->allocator, sizeof(*request)); + request->conn = conn; + request->setup = setup; + request->setup_baton = setup_baton; + request->handler = NULL; + request->respool = NULL; + request->req_bkt = NULL; + request->resp_bkt = NULL; + request->priority = 1; + request->written = 0; + request->next = NULL; + + /* Link the new request after the last written request, but before all + upcoming requests. */ + if (conn->state == SERF_CONN_CLOSING) { + iter = conn->hold_requests; + } + else { + iter = conn->requests; + } + prev = NULL; + + /* Find a request that has data which needs to be delivered. */ + while (iter != NULL && iter->req_bkt == NULL && iter->written) { + prev = iter; + iter = iter->next; + } + + /* Advance to next non priority request */ + while (iter != NULL && iter->priority) { + prev = iter; + iter = iter->next; + } + + if (prev) { + request->next = iter; + prev->next = request; + } else { + request->next = iter; + if (conn->state == SERF_CONN_CLOSING) { + conn->hold_requests = request; + } + else { + conn->requests = request; + } + } + + if (conn->state != SERF_CONN_CLOSING) { + /* Ensure our pollset becomes writable in context run */ + conn->ctx->dirty_pollset = 1; + conn->dirty_conn = 1; + } + + return request; +} + + +apr_status_t serf_request_cancel(serf_request_t *request) +{ + return cancel_request(request, &request->conn->requests, 0); +} + + +apr_pool_t *serf_request_get_pool(const serf_request_t *request) +{ + return request->respool; +} + + +serf_bucket_alloc_t *serf_request_get_alloc( + const serf_request_t *request) +{ + return request->allocator; +} + + +serf_connection_t *serf_request_get_conn( + const serf_request_t *request) +{ + return request->conn; +} + + +void serf_request_set_handler( + serf_request_t *request, + const serf_response_handler_t handler, + const void **handler_baton) +{ + request->handler = handler; + request->handler_baton = handler_baton; +} + + +serf_bucket_t *serf_request_bucket_request_create( + serf_request_t *request, + const char *method, + const char *uri, + serf_bucket_t *body, + serf_bucket_alloc_t *allocator) +{ + serf_bucket_t *req_bkt, *hdrs_bkt; + serf_connection_t *conn = request->conn; + serf_context_t *ctx = conn->ctx; + + req_bkt = serf_bucket_request_create(method, uri, body, allocator); + hdrs_bkt = serf_bucket_request_get_headers(req_bkt); + + /* Proxy? */ + if (ctx->proxy_address && conn->host_url) + serf_bucket_request_set_root(req_bkt, conn->host_url); + + if (conn->host_info.hostinfo) + serf_bucket_headers_setn(hdrs_bkt, "Host", + conn->host_info.hostinfo); + + /* Setup server authorization headers */ + if (ctx->authn_info.scheme) + ctx->authn_info.scheme->setup_request_func(401, conn, method, uri, + hdrs_bkt); + + /* Setup proxy authorization headers */ + if (ctx->proxy_authn_info.scheme) + ctx->proxy_authn_info.scheme->setup_request_func(407, conn, method, + uri, hdrs_bkt); + + return req_bkt; +} |