summaryrefslogtreecommitdiff
path: root/outgoing.c
diff options
context:
space:
mode:
Diffstat (limited to 'outgoing.c')
-rw-r--r--outgoing.c1431
1 files changed, 1431 insertions, 0 deletions
diff --git a/outgoing.c b/outgoing.c
new file mode 100644
index 0000000..53fac0a
--- /dev/null
+++ b/outgoing.c
@@ -0,0 +1,1431 @@
+/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_pools.h>
+#include <apr_poll.h>
+#include <apr_version.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+
+#include "serf_private.h"
+
+/* cleanup for sockets */
+static apr_status_t clean_skt(void *data)
+{
+ serf_connection_t *conn = data;
+ apr_status_t status = APR_SUCCESS;
+
+ if (conn->skt) {
+ status = apr_socket_close(conn->skt);
+ conn->skt = NULL;
+ }
+
+ return status;
+}
+
+static apr_status_t clean_resp(void *data)
+{
+ serf_request_t *request = data;
+
+ /* The request's RESPOOL is being cleared. */
+
+ /* If the response has allocated some buckets, then destroy them (since
+ the bucket may hold resources other than memory in RESPOOL). Also
+ make sure to set their fields to NULL so connection closure does
+ not attempt to free them again. */
+ if (request->resp_bkt) {
+ serf_bucket_destroy(request->resp_bkt);
+ request->resp_bkt = NULL;
+ }
+ if (request->req_bkt) {
+ serf_bucket_destroy(request->req_bkt);
+ request->req_bkt = NULL;
+ }
+
+ /* ### should we worry about debug stuff, like that performed in
+ ### destroy_request()? should we worry about calling req->handler
+ ### to notify this "cancellation" due to pool clearing? */
+
+ /* This pool just got cleared/destroyed. Don't try to destroy the pool
+ (again) when the request is canceled. */
+ request->respool = NULL;
+
+ return APR_SUCCESS;
+}
+
+/* cleanup for conns */
+static apr_status_t clean_conn(void *data)
+{
+ serf_connection_t *conn = data;
+
+ serf_connection_close(conn);
+
+ return APR_SUCCESS;
+}
+
+/* Update the pollset for this connection. We tweak the pollset based on
+ * whether we want to read and/or write, given conditions within the
+ * connection. If the connection is not (yet) in the pollset, then it
+ * will be added.
+ */
+apr_status_t serf__conn_update_pollset(serf_connection_t *conn)
+{
+ serf_context_t *ctx = conn->ctx;
+ apr_status_t status;
+ apr_pollfd_t desc = { 0 };
+
+ if (!conn->skt) {
+ return APR_SUCCESS;
+ }
+
+ /* Remove the socket from the poll set. */
+ desc.desc_type = APR_POLL_SOCKET;
+ desc.desc.s = conn->skt;
+ desc.reqevents = conn->reqevents;
+
+ status = ctx->pollset_rm(ctx->pollset_baton,
+ &desc, conn);
+ if (status && !APR_STATUS_IS_NOTFOUND(status))
+ return status;
+
+ /* Now put it back in with the correct read/write values. */
+ desc.reqevents = APR_POLLHUP | APR_POLLERR;
+ if (conn->requests) {
+ /* If there are any outstanding events, then we want to read. */
+ /* ### not true. we only want to read IF we have sent some data */
+ desc.reqevents |= APR_POLLIN;
+
+ /* If the connection has unwritten data, or there are any requests
+ * that still have buckets to write out, then we want to write.
+ */
+ if (conn->vec_len)
+ desc.reqevents |= APR_POLLOUT;
+ else {
+ serf_request_t *request = conn->requests;
+
+ if ((conn->probable_keepalive_limit &&
+ conn->completed_requests > conn->probable_keepalive_limit) ||
+ (conn->max_outstanding_requests &&
+ conn->completed_requests - conn->completed_responses >=
+ conn->max_outstanding_requests)) {
+ /* we wouldn't try to write any way right now. */
+ }
+ else {
+ while (request != NULL && request->req_bkt == NULL &&
+ request->written)
+ request = request->next;
+ if (request != NULL)
+ desc.reqevents |= APR_POLLOUT;
+ }
+ }
+ }
+
+ /* If we can have async responses, always look for something to read. */
+ if (conn->async_responses) {
+ desc.reqevents |= APR_POLLIN;
+ }
+
+ /* save our reqevents, so we can pass it in to remove later. */
+ conn->reqevents = desc.reqevents;
+
+ /* Note: even if we don't want to read/write this socket, we still
+ * want to poll it for hangups and errors.
+ */
+ return ctx->pollset_add(ctx->pollset_baton,
+ &desc, &conn->baton);
+}
+
+#ifdef SERF_DEBUG_BUCKET_USE
+
+/* Make sure all response buckets were drained. */
+static void check_buckets_drained(serf_connection_t *conn)
+{
+ serf_request_t *request = conn->requests;
+
+ for ( ; request ; request = request->next ) {
+ if (request->resp_bkt != NULL) {
+ /* ### crap. can't do this. this allocator may have un-drained
+ * ### REQUEST buckets.
+ */
+ /* serf_debug__entered_loop(request->resp_bkt->allocator); */
+ /* ### for now, pretend we closed the conn (resets the tracking) */
+ serf_debug__closed_conn(request->resp_bkt->allocator);
+ }
+ }
+}
+
+#endif
+
+/* Create and connect sockets for any connections which don't have them
+ * yet. This is the core of our lazy-connect behavior.
+ */
+apr_status_t serf__open_connections(serf_context_t *ctx)
+{
+ int i;
+
+ for (i = ctx->conns->nelts; i--; ) {
+ serf_connection_t *conn = GET_CONN(ctx, i);
+ apr_status_t status;
+ apr_socket_t *skt;
+ apr_sockaddr_t *serv_addr;
+
+ conn->seen_in_pollset = 0;
+
+ if (conn->skt != NULL) {
+#ifdef SERF_DEBUG_BUCKET_USE
+ check_buckets_drained(conn);
+#endif
+ continue;
+ }
+
+ /* Delay opening until we have something to deliver! */
+ if (conn->requests == NULL) {
+ continue;
+ }
+
+ apr_pool_clear(conn->skt_pool);
+ apr_pool_cleanup_register(conn->skt_pool, conn, clean_skt, clean_skt);
+
+ /* Do we have to connect to a proxy server? */
+ if (ctx->proxy_address)
+ serv_addr = ctx->proxy_address;
+ else
+ serv_addr = conn->address;
+
+ if ((status = apr_socket_create(&skt, serv_addr->family,
+ SOCK_STREAM,
+#if APR_MAJOR_VERSION > 0
+ APR_PROTO_TCP,
+#endif
+ conn->skt_pool)) != APR_SUCCESS)
+ return status;
+
+ /* Set the socket to be non-blocking */
+ if ((status = apr_socket_timeout_set(skt, 0)) != APR_SUCCESS)
+ return status;
+
+ /* Disable Nagle's algorithm */
+ if ((status = apr_socket_opt_set(skt,
+ APR_TCP_NODELAY, 1)) != APR_SUCCESS)
+ return status;
+
+ /* Configured. Store it into the connection now. */
+ conn->skt = skt;
+
+ /* Now that the socket is set up, let's connect it. This should
+ * return immediately.
+ */
+ if ((status = apr_socket_connect(skt,
+ serv_addr)) != APR_SUCCESS) {
+ if (!APR_STATUS_IS_EINPROGRESS(status))
+ return status;
+ }
+
+ /* Flag our pollset as dirty now that we have a new socket. */
+ conn->dirty_conn = 1;
+ ctx->dirty_pollset = 1;
+
+ /* If the authentication was already started on another connection,
+ prepare this connection (it might be possible to skip some
+ part of the handshaking). */
+ if (ctx->proxy_address) {
+ if (conn->ctx->proxy_authn_info.scheme)
+ conn->ctx->proxy_authn_info.scheme->init_conn_func(407, conn,
+ conn->pool);
+ }
+
+ if (conn->ctx->authn_info.scheme)
+ conn->ctx->authn_info.scheme->init_conn_func(401, conn,
+ conn->pool);
+
+ /* Does this connection require a SSL tunnel over the proxy? */
+ if (ctx->proxy_address && strcmp(conn->host_info.scheme, "https") == 0)
+ serf__ssltunnel_connect(conn);
+ else
+ conn->state = SERF_CONN_CONNECTED;
+ }
+
+ return APR_SUCCESS;
+}
+
+static apr_status_t no_more_writes(serf_connection_t *conn,
+ serf_request_t *request)
+{
+ /* Note that we should hold new requests until we open our new socket. */
+ conn->state = SERF_CONN_CLOSING;
+
+ /* We can take the *next* request in our list and assume it hasn't
+ * been written yet and 'save' it for the new socket.
+ */
+ conn->hold_requests = request->next;
+ conn->hold_requests_tail = conn->requests_tail;
+ request->next = NULL;
+ conn->requests_tail = request;
+
+ /* Clear our iovec. */
+ conn->vec_len = 0;
+
+ /* Update the pollset to know we don't want to write on this socket any
+ * more.
+ */
+ conn->dirty_conn = 1;
+ conn->ctx->dirty_pollset = 1;
+ return APR_SUCCESS;
+}
+
+/* Read the 'Connection' header from the response. Return SERF_ERROR_CLOSING if
+ * the header contains value 'close' indicating the server is closing the
+ * connection right after this response.
+ * Otherwise returns APR_SUCCESS.
+ */
+static apr_status_t is_conn_closing(serf_bucket_t *response)
+{
+ serf_bucket_t *hdrs;
+ const char *val;
+
+ hdrs = serf_bucket_response_get_headers(response);
+ val = serf_bucket_headers_get(hdrs, "Connection");
+ if (val && strcasecmp("close", val) == 0)
+ {
+ return SERF_ERROR_CLOSING;
+ }
+
+ return APR_SUCCESS;
+}
+
+static void link_requests(serf_request_t **list, serf_request_t **tail,
+ serf_request_t *request)
+{
+ if (*list == NULL) {
+ *list = request;
+ *tail = request;
+ }
+ else {
+ (*tail)->next = request;
+ *tail = request;
+ }
+}
+
+static apr_status_t destroy_request(serf_request_t *request)
+{
+ serf_connection_t *conn = request->conn;
+
+ /* The request and response buckets are no longer needed,
+ nor is the request's pool. */
+ if (request->resp_bkt) {
+ serf_debug__closed_conn(request->resp_bkt->allocator);
+ serf_bucket_destroy(request->resp_bkt);
+ request->resp_bkt = NULL;
+ }
+ if (request->req_bkt) {
+ serf_debug__closed_conn(request->req_bkt->allocator);
+ serf_bucket_destroy(request->req_bkt);
+ request->req_bkt = NULL;
+ }
+
+ serf_debug__bucket_alloc_check(request->allocator);
+ if (request->respool) {
+ /* ### unregister the pool cleanup for self? */
+ apr_pool_destroy(request->respool);
+ }
+
+ serf_bucket_mem_free(conn->allocator, request);
+
+ return APR_SUCCESS;
+}
+
+static apr_status_t cancel_request(serf_request_t *request,
+ serf_request_t **list,
+ int notify_request)
+{
+ /* If we haven't run setup, then we won't have a handler to call. */
+ if (request->handler && notify_request) {
+ /* We actually don't care what the handler returns.
+ * We have bigger matters at hand.
+ */
+ (*request->handler)(request, NULL, request->handler_baton,
+ request->respool);
+ }
+
+ if (*list == request) {
+ *list = request->next;
+ }
+ else {
+ serf_request_t *scan = *list;
+
+ while (scan->next && scan->next != request)
+ scan = scan->next;
+
+ if (scan->next) {
+ scan->next = scan->next->next;
+ }
+ }
+
+ return destroy_request(request);
+}
+
+static apr_status_t remove_connection(serf_context_t *ctx,
+ serf_connection_t *conn)
+{
+ apr_pollfd_t desc = { 0 };
+
+ desc.desc_type = APR_POLL_SOCKET;
+ desc.desc.s = conn->skt;
+ desc.reqevents = conn->reqevents;
+
+ return ctx->pollset_rm(ctx->pollset_baton,
+ &desc, conn);
+}
+
+static void destroy_ostream(serf_connection_t *conn)
+{
+ if (conn->ostream_head != NULL) {
+ serf_bucket_destroy(conn->ostream_head);
+ conn->ostream_head = NULL;
+ conn->ostream_tail = NULL;
+ }
+}
+
+/* A socket was closed, inform the application. */
+static void handle_conn_closed(serf_connection_t *conn, apr_status_t status)
+{
+ (*conn->closed)(conn, conn->closed_baton, status,
+ conn->pool);
+}
+
+static apr_status_t reset_connection(serf_connection_t *conn,
+ int requeue_requests)
+{
+ serf_context_t *ctx = conn->ctx;
+ apr_status_t status;
+ serf_request_t *old_reqs, *held_reqs, *held_reqs_tail;
+
+ conn->probable_keepalive_limit = conn->completed_responses;
+ conn->completed_requests = 0;
+ conn->completed_responses = 0;
+
+ old_reqs = conn->requests;
+ held_reqs = conn->hold_requests;
+ held_reqs_tail = conn->hold_requests_tail;
+
+ if (conn->state == SERF_CONN_CLOSING) {
+ conn->hold_requests = NULL;
+ conn->hold_requests_tail = NULL;
+ }
+
+ conn->requests = NULL;
+ conn->requests_tail = NULL;
+
+ while (old_reqs) {
+ /* If we haven't started to write the connection, bring it over
+ * unchanged to our new socket. Otherwise, call the cancel function.
+ */
+ if (requeue_requests && !old_reqs->written) {
+ serf_request_t *req = old_reqs;
+ old_reqs = old_reqs->next;
+ req->next = NULL;
+ link_requests(&conn->requests, &conn->requests_tail, req);
+ }
+ else {
+ cancel_request(old_reqs, &old_reqs, requeue_requests);
+ }
+ }
+
+ if (conn->requests_tail) {
+ conn->requests_tail->next = held_reqs;
+ }
+ else {
+ conn->requests = held_reqs;
+ }
+ if (held_reqs_tail) {
+ conn->requests_tail = held_reqs_tail;
+ }
+
+ if (conn->skt != NULL) {
+ remove_connection(ctx, conn);
+ status = apr_socket_close(conn->skt);
+ if (conn->closed != NULL) {
+ handle_conn_closed(conn, status);
+ }
+ conn->skt = NULL;
+ }
+
+ if (conn->stream != NULL) {
+ serf_bucket_destroy(conn->stream);
+ conn->stream = NULL;
+ }
+
+ destroy_ostream(conn);
+
+ /* Don't try to resume any writes */
+ conn->vec_len = 0;
+
+ conn->dirty_conn = 1;
+ conn->ctx->dirty_pollset = 1;
+ conn->state = SERF_CONN_INIT;
+
+ conn->status = APR_SUCCESS;
+
+ /* Let our context know that we've 'reset' the socket already. */
+ conn->seen_in_pollset |= APR_POLLHUP;
+
+ /* Found the connection. Closed it. All done. */
+ return APR_SUCCESS;
+}
+
+static apr_status_t socket_writev(serf_connection_t *conn)
+{
+ apr_size_t written;
+ apr_status_t status;
+
+ status = apr_socket_sendv(conn->skt, conn->vec,
+ conn->vec_len, &written);
+
+ /* did we write everything? */
+ if (written) {
+ apr_size_t len = 0;
+ int i;
+
+ for (i = 0; i < conn->vec_len; i++) {
+ len += conn->vec[i].iov_len;
+ if (written < len) {
+ if (i) {
+ memmove(conn->vec, &conn->vec[i],
+ sizeof(struct iovec) * (conn->vec_len - i));
+ conn->vec_len -= i;
+ }
+ conn->vec[0].iov_base = (char *)conn->vec[0].iov_base + (conn->vec[0].iov_len - (len - written));
+ conn->vec[0].iov_len = len - written;
+ break;
+ }
+ }
+ if (len == written) {
+ conn->vec_len = 0;
+ }
+
+ /* Log progress information */
+ serf__context_progress_delta(conn->ctx, 0, written);
+ }
+
+ return status;
+}
+
+static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket)
+{
+ serf_connection_t *conn = baton;
+ conn->hit_eof = 1;
+ return APR_EAGAIN;
+}
+
+static apr_status_t do_conn_setup(serf_connection_t *conn)
+{
+ apr_status_t status;
+ serf_bucket_t *ostream;
+
+ if (conn->ostream_head == NULL) {
+ conn->ostream_head = serf_bucket_aggregate_create(conn->allocator);
+ }
+
+ if (conn->ostream_tail == NULL) {
+ conn->ostream_tail = serf__bucket_stream_create(conn->allocator,
+ detect_eof,
+ conn);
+ }
+
+ ostream = conn->ostream_tail;
+
+ status = (*conn->setup)(conn->skt,
+ &conn->stream,
+ &ostream,
+ conn->setup_baton,
+ conn->pool);
+ if (status) {
+ /* extra destroy here since it wasn't added to the head bucket yet. */
+ serf_bucket_destroy(conn->ostream_tail);
+ destroy_ostream(conn);
+ return status;
+ }
+
+ serf_bucket_aggregate_append(conn->ostream_head,
+ ostream);
+
+ return status;
+}
+
+/* Set up the input and output stream buckets.
+ When a tunnel over an http proxy is needed, create a socket bucket and
+ empty aggregate bucket for sending and receiving unencrypted requests
+ over the socket.
+
+ After the tunnel is there, or no tunnel was needed, ask the application
+ to create the input and output buckets, which should take care of the
+ [en/de]cryption.
+*/
+
+static apr_status_t prepare_conn_streams(serf_connection_t *conn,
+ serf_bucket_t **istream,
+ serf_bucket_t **ostreamt,
+ serf_bucket_t **ostreamh)
+{
+ apr_status_t status;
+
+ /* Do we need a SSL tunnel first? */
+ if (conn->state == SERF_CONN_CONNECTED) {
+ /* If the connection does not have an associated bucket, then
+ * call the setup callback to get one.
+ */
+ if (conn->stream == NULL) {
+ status = do_conn_setup(conn);
+ if (status) {
+ return status;
+ }
+ }
+ *ostreamt = conn->ostream_tail;
+ *ostreamh = conn->ostream_head;
+ *istream = conn->stream;
+ } else {
+ /* SSL tunnel needed and not set up yet, get a direct unencrypted
+ stream for this socket */
+ if (conn->stream == NULL) {
+ *istream = serf_bucket_socket_create(conn->skt,
+ conn->allocator);
+ }
+ /* Don't create the ostream bucket chain including the ssl_encrypt
+ bucket yet. This ensure the CONNECT request is sent unencrypted
+ to the proxy. */
+ *ostreamt = *ostreamh = conn->ssltunnel_ostream;
+ }
+
+ return APR_SUCCESS;
+}
+
+/* write data out to the connection */
+static apr_status_t write_to_connection(serf_connection_t *conn)
+{
+ serf_request_t *request = conn->requests;
+
+ if (conn->probable_keepalive_limit &&
+ conn->completed_requests > conn->probable_keepalive_limit) {
+
+ conn->dirty_conn = 1;
+ conn->ctx->dirty_pollset = 1;
+
+ /* backoff for now. */
+ return APR_SUCCESS;
+ }
+
+ /* Find a request that has data which needs to be delivered. */
+ while (request != NULL &&
+ request->req_bkt == NULL && request->written)
+ request = request->next;
+
+ /* assert: request != NULL || conn->vec_len */
+
+ /* Keep reading and sending until we run out of stuff to read, or
+ * writing would block.
+ */
+ while (1) {
+ int stop_reading = 0;
+ apr_status_t status;
+ apr_status_t read_status;
+ serf_bucket_t *ostreamt, *ostreamh;
+ int max_outstanding_requests = conn->max_outstanding_requests;
+
+ /* If we're setting up an ssl tunnel, we can't send real requests
+ at yet, as they need to be encrypted and our encrypt buckets
+ aren't created yet as we still need to read the unencrypted
+ response of the CONNECT request. */
+ if (conn->state != SERF_CONN_CONNECTED)
+ max_outstanding_requests = 1;
+
+ if (max_outstanding_requests &&
+ conn->completed_requests -
+ conn->completed_responses >= max_outstanding_requests) {
+ /* backoff for now. */
+ return APR_SUCCESS;
+ }
+
+ /* If we have unwritten data, then write what we can. */
+ while (conn->vec_len) {
+ status = socket_writev(conn);
+
+ /* If the write would have blocked, then we're done. Don't try
+ * to write anything else to the socket.
+ */
+ if (APR_STATUS_IS_EAGAIN(status))
+ return APR_SUCCESS;
+ if (APR_STATUS_IS_EPIPE(status))
+ return no_more_writes(conn, request);
+ if (status)
+ return status;
+ }
+ /* ### can we have a short write, yet no EAGAIN? a short write
+ ### would imply unwritten_len > 0 ... */
+ /* assert: unwritten_len == 0. */
+
+ /* We may need to move forward to a request which has something
+ * to write.
+ */
+ while (request != NULL &&
+ request->req_bkt == NULL && request->written)
+ request = request->next;
+
+ if (request == NULL) {
+ /* No more requests (with data) are registered with the
+ * connection. Let's update the pollset so that we don't
+ * try to write to this socket again.
+ */
+ conn->dirty_conn = 1;
+ conn->ctx->dirty_pollset = 1;
+ return APR_SUCCESS;
+ }
+
+ status = prepare_conn_streams(conn, &conn->stream, &ostreamt, &ostreamh);
+ if (status) {
+ return status;
+ }
+
+ if (request->req_bkt == NULL) {
+ /* Now that we are about to serve the request, allocate a pool. */
+ apr_pool_create(&request->respool, conn->pool);
+ request->allocator = serf_bucket_allocator_create(request->respool,
+ NULL, NULL);
+ apr_pool_cleanup_register(request->respool, request,
+ clean_resp, clean_resp);
+
+ /* Fill in the rest of the values for the request. */
+ read_status = request->setup(request, request->setup_baton,
+ &request->req_bkt,
+ &request->acceptor,
+ &request->acceptor_baton,
+ &request->handler,
+ &request->handler_baton,
+ request->respool);
+
+ if (read_status) {
+ /* Something bad happened. Propagate any errors. */
+ return read_status;
+ }
+
+ request->written = 1;
+ serf_bucket_aggregate_append(ostreamt, request->req_bkt);
+ }
+
+ /* ### optimize at some point by using read_for_sendfile */
+ read_status = serf_bucket_read_iovec(ostreamh,
+ SERF_READ_ALL_AVAIL,
+ IOV_MAX,
+ conn->vec,
+ &conn->vec_len);
+
+ if (!conn->hit_eof) {
+ if (APR_STATUS_IS_EAGAIN(read_status) ||
+ read_status == SERF_ERROR_WAIT_CONN) {
+ /* We read some stuff, but should not try to read again. */
+ stop_reading = 1;
+
+ /* ### we should avoid looking for writability for a while so
+ ### that (hopefully) something will appear in the bucket so
+ ### we can actually write something. otherwise, we could
+ ### end up in a CPU spin: socket wants something, but we
+ ### don't have anything (and keep returning EAGAIN)
+ */
+ }
+ else if (read_status && !APR_STATUS_IS_EOF(read_status)) {
+ /* Something bad happened. Propagate any errors. */
+ return read_status;
+ }
+ }
+
+ /* If we got some data, then deliver it. */
+ /* ### what to do if we got no data?? is that a problem? */
+ if (conn->vec_len > 0) {
+ status = socket_writev(conn);
+
+ /* If we can't write any more, or an error occurred, then
+ * we're done here.
+ */
+ if (APR_STATUS_IS_EAGAIN(status))
+ return APR_SUCCESS;
+ if (APR_STATUS_IS_EPIPE(status))
+ return no_more_writes(conn, request);
+ if (APR_STATUS_IS_ECONNRESET(status)) {
+ return no_more_writes(conn, request);
+ }
+ if (status)
+ return status;
+ }
+
+ if (read_status == SERF_ERROR_WAIT_CONN) {
+ stop_reading = 1;
+ }
+ else if (read_status && conn->hit_eof && conn->vec_len == 0) {
+ /* If we hit the end of the request bucket and all of its data has
+ * been written, then clear it out to signify that we're done
+ * sending the request. On the next iteration through this loop:
+ * - if there are remaining bytes they will be written, and as the
+ * request bucket will be completely read it will be destroyed then.
+ * - we'll see if there are other requests that need to be sent
+ * ("pipelining").
+ */
+ conn->hit_eof = 0;
+ serf_bucket_destroy(request->req_bkt);
+ request->req_bkt = NULL;
+
+ /* If our connection has async responses enabled, we're not
+ * going to get a reply back, so kill the request.
+ */
+ if (conn->async_responses) {
+ conn->requests = request->next;
+ destroy_request(request);
+ }
+
+ conn->completed_requests++;
+
+ if (conn->probable_keepalive_limit &&
+ conn->completed_requests > conn->probable_keepalive_limit) {
+ /* backoff for now. */
+ stop_reading = 1;
+ }
+ }
+
+ if (stop_reading) {
+ return APR_SUCCESS;
+ }
+ }
+ /* NOTREACHED */
+}
+
+/* A response message was received from the server, so call
+ the handler as specified on the original request. */
+static apr_status_t handle_response(serf_request_t *request,
+ apr_pool_t *pool)
+{
+ apr_status_t status = APR_SUCCESS;
+ int consumed_response = 0;
+
+ /* Only enable the new authentication framework if the program has
+ * registered an authentication credential callback.
+ *
+ * This permits older Serf apps to still handle authentication
+ * themselves by not registering credential callbacks.
+ */
+ if (request->conn->ctx->cred_cb) {
+ status = serf__handle_auth_response(&consumed_response,
+ request,
+ request->resp_bkt,
+ request->handler_baton,
+ pool);
+
+ /* If there was an error reading the response (maybe there wasn't
+ enough data available), don't bother passing the response to the
+ application.
+
+ If the authentication was tried, but failed, pass the response
+ to the application, maybe it can do better. */
+ if (APR_STATUS_IS_EOF(status) ||
+ APR_STATUS_IS_EAGAIN(status)) {
+ return status;
+ }
+ }
+
+ if (!consumed_response) {
+ return (*request->handler)(request,
+ request->resp_bkt,
+ request->handler_baton,
+ pool);
+ }
+
+ return status;
+}
+
+/* An async response message was received from the server. */
+static apr_status_t handle_async_response(serf_connection_t *conn,
+ apr_pool_t *pool)
+{
+ apr_status_t status;
+
+ if (conn->current_async_response == NULL) {
+ conn->current_async_response =
+ (*conn->async_acceptor)(NULL, conn->stream,
+ conn->async_acceptor_baton, pool);
+ }
+
+ status = (*conn->async_handler)(NULL, conn->current_async_response,
+ conn->async_handler_baton, pool);
+
+ if (APR_STATUS_IS_EOF(status)) {
+ serf_bucket_destroy(conn->current_async_response);
+ conn->current_async_response = NULL;
+ status = APR_SUCCESS;
+ }
+
+ return status;
+}
+
+/* read data from the connection */
+static apr_status_t read_from_connection(serf_connection_t *conn)
+{
+ apr_status_t status;
+ apr_pool_t *tmppool;
+ int close_connection = FALSE;
+
+ /* Whatever is coming in on the socket corresponds to the first request
+ * on our chain.
+ */
+ serf_request_t *request = conn->requests;
+
+ /* assert: request != NULL */
+
+ if ((status = apr_pool_create(&tmppool, conn->pool)) != APR_SUCCESS)
+ goto error;
+
+ /* Invoke response handlers until we have no more work. */
+ while (1) {
+ serf_bucket_t *dummy1, *dummy2;
+
+ apr_pool_clear(tmppool);
+
+ /* Only interested in the input stream here. */
+ status = prepare_conn_streams(conn, &conn->stream, &dummy1, &dummy2);
+ if (status) {
+ goto error;
+ }
+
+ /* We have a different codepath when we can have async responses. */
+ if (conn->async_responses) {
+ /* TODO What about socket errors? */
+ status = handle_async_response(conn, tmppool);
+ if (APR_STATUS_IS_EAGAIN(status)) {
+ status = APR_SUCCESS;
+ goto error;
+ }
+ if (status) {
+ goto error;
+ }
+ continue;
+ }
+
+ /* We are reading a response for a request we haven't
+ * written yet!
+ *
+ * This shouldn't normally happen EXCEPT:
+ *
+ * 1) when the other end has closed the socket and we're
+ * pending an EOF return.
+ * 2) Doing the initial SSL handshake - we'll get EAGAIN
+ * as the SSL buckets will hide the handshake from us
+ * but not return any data.
+ *
+ * In these cases, we should not receive any actual user data.
+ *
+ * If we see an EOF (due to an expired timeout), we'll reset the
+ * connection and open a new one.
+ */
+ if (request->req_bkt || !request->written) {
+ const char *data;
+ apr_size_t len;
+
+ status = serf_bucket_read(conn->stream, SERF_READ_ALL_AVAIL,
+ &data, &len);
+
+ if (!status && len) {
+ status = APR_EGENERAL;
+ }
+ else if (APR_STATUS_IS_EOF(status)) {
+ reset_connection(conn, 1);
+ status = APR_SUCCESS;
+ }
+ else if (APR_STATUS_IS_EAGAIN(status)) {
+ status = APR_SUCCESS;
+ }
+
+ goto error;
+ }
+
+ /* If the request doesn't have a response bucket, then call the
+ * acceptor to get one created.
+ */
+ if (request->resp_bkt == NULL) {
+ request->resp_bkt = (*request->acceptor)(request, conn->stream,
+ request->acceptor_baton,
+ tmppool);
+ apr_pool_clear(tmppool);
+ }
+
+ status = handle_response(request, tmppool);
+
+ /* Some systems will not generate a HUP poll event so we have to
+ * handle the ECONNRESET issue here.
+ */
+ if (APR_STATUS_IS_ECONNRESET(status) ||
+ status == SERF_ERROR_REQUEST_LOST) {
+ reset_connection(conn, 1);
+ status = APR_SUCCESS;
+ goto error;
+ }
+
+ /* If our response handler says it can't do anything more, we now
+ * treat that as a success.
+ */
+ if (APR_STATUS_IS_EAGAIN(status)) {
+ status = APR_SUCCESS;
+ goto error;
+ }
+
+ /* If we received APR_SUCCESS, run this loop again. */
+ if (!status) {
+ continue;
+ }
+
+ close_connection = is_conn_closing(request->resp_bkt);
+
+ if (!APR_STATUS_IS_EOF(status) &&
+ close_connection != SERF_ERROR_CLOSING) {
+ /* Whether success, or an error, there is no more to do unless
+ * this request has been completed.
+ */
+ goto error;
+ }
+
+ /* The request has been fully-delivered, and the response has
+ * been fully-read. Remove it from our queue and loop to read
+ * another response.
+ */
+ conn->requests = request->next;
+
+ destroy_request(request);
+
+ request = conn->requests;
+
+ /* If we're truly empty, update our tail. */
+ if (request == NULL) {
+ conn->requests_tail = NULL;
+ }
+
+ conn->completed_responses++;
+
+ /* We've to rebuild pollset since completed_responses is changed. */
+ conn->dirty_conn = 1;
+ conn->ctx->dirty_pollset = 1;
+
+ /* This means that we're being advised that the connection is done. */
+ if (close_connection == SERF_ERROR_CLOSING) {
+ reset_connection(conn, 1);
+ if (APR_STATUS_IS_EOF(status))
+ status = APR_SUCCESS;
+ goto error;
+ }
+
+ /* The server is suddenly deciding to serve more responses than we've
+ * seen before.
+ *
+ * Let our requests go.
+ */
+ if (conn->probable_keepalive_limit &&
+ conn->completed_responses > conn->probable_keepalive_limit) {
+ conn->probable_keepalive_limit = 0;
+ }
+
+ /* If we just ran out of requests or have unwritten requests, then
+ * update the pollset. We don't want to read from this socket any
+ * more. We are definitely done with this loop, too.
+ */
+ if (request == NULL || !request->written) {
+ conn->dirty_conn = 1;
+ conn->ctx->dirty_pollset = 1;
+ status = APR_SUCCESS;
+ goto error;
+ }
+ }
+
+error:
+ apr_pool_destroy(tmppool);
+ return status;
+}
+
+/* process all events on the connection */
+apr_status_t serf__process_connection(serf_connection_t *conn,
+ apr_int16_t events)
+{
+ apr_status_t status;
+
+ /* POLLHUP/ERR should come after POLLIN so if there's an error message or
+ * the like sitting on the connection, we give the app a chance to read
+ * it before we trigger a reset condition.
+ */
+ if ((events & APR_POLLIN) != 0) {
+ if ((status = read_from_connection(conn)) != APR_SUCCESS)
+ return status;
+
+ /* If we decided to reset our connection, return now as we don't
+ * want to write.
+ */
+ if ((conn->seen_in_pollset & APR_POLLHUP) != 0) {
+ return APR_SUCCESS;
+ }
+ }
+ if ((events & APR_POLLHUP) != 0) {
+ /* The connection got reset by the server. On Windows this can happen
+ when all data is read, so just cleanup the connection and open
+ a new one. */
+ return reset_connection(conn, 1);
+ }
+ if ((events & APR_POLLERR) != 0) {
+ /* We might be talking to a buggy HTTP server that doesn't
+ * do lingering-close. (httpd < 2.1.8 does this.)
+ *
+ * See:
+ *
+ * http://issues.apache.org/bugzilla/show_bug.cgi?id=35292
+ */
+ if (conn->completed_requests && !conn->probable_keepalive_limit) {
+ return reset_connection(conn, 1);
+ }
+ return APR_EGENERAL;
+ }
+ if ((events & APR_POLLOUT) != 0) {
+ if ((status = write_to_connection(conn)) != APR_SUCCESS)
+ return status;
+ }
+ return APR_SUCCESS;
+}
+
+serf_connection_t *serf_connection_create(
+ serf_context_t *ctx,
+ apr_sockaddr_t *address,
+ serf_connection_setup_t setup,
+ void *setup_baton,
+ serf_connection_closed_t closed,
+ void *closed_baton,
+ apr_pool_t *pool)
+{
+ serf_connection_t *conn = apr_pcalloc(pool, sizeof(*conn));
+
+ conn->ctx = ctx;
+ conn->status = APR_SUCCESS;
+ conn->address = address;
+ conn->setup = setup;
+ conn->setup_baton = setup_baton;
+ conn->closed = closed;
+ conn->closed_baton = closed_baton;
+ conn->pool = pool;
+ conn->allocator = serf_bucket_allocator_create(pool, NULL, NULL);
+ conn->stream = NULL;
+ conn->ostream_head = NULL;
+ conn->ostream_tail = NULL;
+ conn->baton.type = SERF_IO_CONN;
+ conn->baton.u.conn = conn;
+ conn->hit_eof = 0;
+ conn->state = SERF_CONN_INIT;
+
+ /* Create a subpool for our connection. */
+ apr_pool_create(&conn->skt_pool, conn->pool);
+
+ /* register a cleanup */
+ apr_pool_cleanup_register(conn->pool, conn, clean_conn, apr_pool_cleanup_null);
+
+ /* Add the connection to the context. */
+ *(serf_connection_t **)apr_array_push(ctx->conns) = conn;
+
+ return conn;
+}
+
+apr_status_t serf_connection_create2(
+ serf_connection_t **conn,
+ serf_context_t *ctx,
+ apr_uri_t host_info,
+ serf_connection_setup_t setup,
+ void *setup_baton,
+ serf_connection_closed_t closed,
+ void *closed_baton,
+ apr_pool_t *pool)
+{
+ apr_status_t status;
+ serf_connection_t *c;
+ apr_sockaddr_t *host_address;
+
+ /* Parse the url, store the address of the server. */
+ status = apr_sockaddr_info_get(&host_address,
+ host_info.hostname,
+ APR_UNSPEC, host_info.port, 0, pool);
+ if (status)
+ return status;
+
+ c = serf_connection_create(ctx, host_address, setup, setup_baton,
+ closed, closed_baton, pool);
+
+ /* We're not interested in the path following the hostname. */
+ c->host_url = apr_uri_unparse(c->pool,
+ &host_info,
+ APR_URI_UNP_OMITPATHINFO);
+ c->host_info = host_info;
+
+ *conn = c;
+
+ return status;
+}
+
+apr_status_t serf_connection_reset(
+ serf_connection_t *conn)
+{
+ return reset_connection(conn, 0);
+}
+
+
+apr_status_t serf_connection_close(
+ serf_connection_t *conn)
+{
+ int i;
+ serf_context_t *ctx = conn->ctx;
+ apr_status_t status;
+
+ for (i = ctx->conns->nelts; i--; ) {
+ serf_connection_t *conn_seq = GET_CONN(ctx, i);
+
+ if (conn_seq == conn) {
+ while (conn->requests) {
+ serf_request_cancel(conn->requests);
+ }
+ if (conn->skt != NULL) {
+ remove_connection(ctx, conn);
+ status = apr_socket_close(conn->skt);
+ if (conn->closed != NULL) {
+ handle_conn_closed(conn, status);
+ }
+ conn->skt = NULL;
+ }
+ if (conn->stream != NULL) {
+ serf_bucket_destroy(conn->stream);
+ conn->stream = NULL;
+ }
+
+ /* Remove the connection from the context. We don't want to
+ * deal with it any more.
+ */
+ if (i < ctx->conns->nelts - 1) {
+ /* move later connections over this one. */
+ memmove(
+ &GET_CONN(ctx, i),
+ &GET_CONN(ctx, i + 1),
+ (ctx->conns->nelts - i - 1) * sizeof(serf_connection_t *));
+ }
+ --ctx->conns->nelts;
+
+ /* Found the connection. Closed it. All done. */
+ return APR_SUCCESS;
+ }
+ }
+
+ /* We didn't find the specified connection. */
+ /* ### doc talks about this w.r.t poll structures. use something else? */
+ return APR_NOTFOUND;
+}
+
+
+void serf_connection_set_max_outstanding_requests(
+ serf_connection_t *conn,
+ unsigned int max_requests)
+{
+ conn->max_outstanding_requests = max_requests;
+}
+
+
+void serf_connection_set_async_responses(
+ serf_connection_t *conn,
+ serf_response_acceptor_t acceptor,
+ void *acceptor_baton,
+ serf_response_handler_t handler,
+ void *handler_baton)
+{
+ conn->async_responses = 1;
+ conn->async_acceptor = acceptor;
+ conn->async_acceptor_baton = acceptor_baton;
+ conn->async_handler = handler;
+ conn->async_handler_baton = handler_baton;
+}
+
+
+serf_request_t *serf_connection_request_create(
+ serf_connection_t *conn,
+ serf_request_setup_t setup,
+ void *setup_baton)
+{
+ serf_request_t *request;
+
+ request = serf_bucket_mem_alloc(conn->allocator, sizeof(*request));
+ request->conn = conn;
+ request->setup = setup;
+ request->setup_baton = setup_baton;
+ request->handler = NULL;
+ request->respool = NULL;
+ request->req_bkt = NULL;
+ request->resp_bkt = NULL;
+ request->priority = 0;
+ request->written = 0;
+ request->next = NULL;
+
+ /* Link the request to the end of the request chain. */
+ if (conn->state == SERF_CONN_CLOSING) {
+ link_requests(&conn->hold_requests, &conn->hold_requests_tail, request);
+ }
+ else {
+ link_requests(&conn->requests, &conn->requests_tail, request);
+
+ /* Ensure our pollset becomes writable in context run */
+ conn->ctx->dirty_pollset = 1;
+ conn->dirty_conn = 1;
+ }
+
+ return request;
+}
+
+
+serf_request_t *serf_connection_priority_request_create(
+ serf_connection_t *conn,
+ serf_request_setup_t setup,
+ void *setup_baton)
+{
+ serf_request_t *request;
+ serf_request_t *iter, *prev;
+
+ request = serf_bucket_mem_alloc(conn->allocator, sizeof(*request));
+ request->conn = conn;
+ request->setup = setup;
+ request->setup_baton = setup_baton;
+ request->handler = NULL;
+ request->respool = NULL;
+ request->req_bkt = NULL;
+ request->resp_bkt = NULL;
+ request->priority = 1;
+ request->written = 0;
+ request->next = NULL;
+
+ /* Link the new request after the last written request, but before all
+ upcoming requests. */
+ if (conn->state == SERF_CONN_CLOSING) {
+ iter = conn->hold_requests;
+ }
+ else {
+ iter = conn->requests;
+ }
+ prev = NULL;
+
+ /* Find a request that has data which needs to be delivered. */
+ while (iter != NULL && iter->req_bkt == NULL && iter->written) {
+ prev = iter;
+ iter = iter->next;
+ }
+
+ /* Advance to next non priority request */
+ while (iter != NULL && iter->priority) {
+ prev = iter;
+ iter = iter->next;
+ }
+
+ if (prev) {
+ request->next = iter;
+ prev->next = request;
+ } else {
+ request->next = iter;
+ if (conn->state == SERF_CONN_CLOSING) {
+ conn->hold_requests = request;
+ }
+ else {
+ conn->requests = request;
+ }
+ }
+
+ if (conn->state != SERF_CONN_CLOSING) {
+ /* Ensure our pollset becomes writable in context run */
+ conn->ctx->dirty_pollset = 1;
+ conn->dirty_conn = 1;
+ }
+
+ return request;
+}
+
+
+apr_status_t serf_request_cancel(serf_request_t *request)
+{
+ return cancel_request(request, &request->conn->requests, 0);
+}
+
+
+apr_pool_t *serf_request_get_pool(const serf_request_t *request)
+{
+ return request->respool;
+}
+
+
+serf_bucket_alloc_t *serf_request_get_alloc(
+ const serf_request_t *request)
+{
+ return request->allocator;
+}
+
+
+serf_connection_t *serf_request_get_conn(
+ const serf_request_t *request)
+{
+ return request->conn;
+}
+
+
+void serf_request_set_handler(
+ serf_request_t *request,
+ const serf_response_handler_t handler,
+ const void **handler_baton)
+{
+ request->handler = handler;
+ request->handler_baton = handler_baton;
+}
+
+
+serf_bucket_t *serf_request_bucket_request_create(
+ serf_request_t *request,
+ const char *method,
+ const char *uri,
+ serf_bucket_t *body,
+ serf_bucket_alloc_t *allocator)
+{
+ serf_bucket_t *req_bkt, *hdrs_bkt;
+ serf_connection_t *conn = request->conn;
+ serf_context_t *ctx = conn->ctx;
+
+ req_bkt = serf_bucket_request_create(method, uri, body, allocator);
+ hdrs_bkt = serf_bucket_request_get_headers(req_bkt);
+
+ /* Proxy? */
+ if (ctx->proxy_address && conn->host_url)
+ serf_bucket_request_set_root(req_bkt, conn->host_url);
+
+ if (conn->host_info.hostinfo)
+ serf_bucket_headers_setn(hdrs_bkt, "Host",
+ conn->host_info.hostinfo);
+
+ /* Setup server authorization headers */
+ if (ctx->authn_info.scheme)
+ ctx->authn_info.scheme->setup_request_func(401, conn, method, uri,
+ hdrs_bkt);
+
+ /* Setup proxy authorization headers */
+ if (ctx->proxy_authn_info.scheme)
+ ctx->proxy_authn_info.scheme->setup_request_func(407, conn, method,
+ uri, hdrs_bkt);
+
+ return req_bkt;
+}