diff options
Diffstat (limited to 'trunk/daemon/soup-input-stream.c')
-rw-r--r-- | trunk/daemon/soup-input-stream.c | 921 |
1 files changed, 921 insertions, 0 deletions
diff --git a/trunk/daemon/soup-input-stream.c b/trunk/daemon/soup-input-stream.c new file mode 100644 index 00000000..ce3fa939 --- /dev/null +++ b/trunk/daemon/soup-input-stream.c @@ -0,0 +1,921 @@ +/* soup-input-stream.c, based on gsocketinputstream.c + * + * Copyright (C) 2006-2007 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General + * Public License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include <config.h> + +#include <string.h> + +#include <glib.h> +#include <gio/gio.h> + +#include <libsoup/soup.h> + +#include "soup-input-stream.h" + +static void soup_input_stream_seekable_iface_init (GSeekableIface *seekable_iface); + +G_DEFINE_TYPE_WITH_CODE (SoupInputStream, soup_input_stream, G_TYPE_INPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE, + soup_input_stream_seekable_iface_init)) + +typedef void (*SoupInputStreamCallback) (GInputStream *); + +typedef struct { + SoupSession *session; + GMainContext *async_context; + SoupMessage *msg; + gboolean got_headers, finished; + goffset offset; + + GCancellable *cancellable; + GSource *cancel_watch; + SoupInputStreamCallback got_headers_cb; + SoupInputStreamCallback got_chunk_cb; + SoupInputStreamCallback finished_cb; + SoupInputStreamCallback cancelled_cb; + + guchar *leftover_buffer; + gsize leftover_bufsize, leftover_offset; + + guchar *caller_buffer; + gsize caller_bufsize, caller_nread; + GAsyncReadyCallback outstanding_callback; + GSimpleAsyncResult *result; + +} SoupInputStreamPrivate; +#define SOUP_INPUT_STREAM_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_INPUT_STREAM, SoupInputStreamPrivate)) + + +static gssize soup_input_stream_read (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error); +static gboolean soup_input_stream_close (GInputStream *stream, + GCancellable *cancellable, + GError **error); +static void soup_input_stream_read_async (GInputStream *stream, + void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gssize soup_input_stream_read_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); +static void soup_input_stream_close_async (GInputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gboolean soup_input_stream_close_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); + +static goffset soup_input_stream_tell (GSeekable *seekable); + +static gboolean soup_input_stream_can_seek (GSeekable *seekable); +static gboolean soup_input_stream_seek (GSeekable *seekable, + goffset offset, + GSeekType type, + GCancellable *cancellable, + GError **error); + +static gboolean soup_input_stream_can_truncate (GSeekable *seekable); +static gboolean soup_input_stream_truncate (GSeekable *seekable, + goffset offset, + GCancellable *cancellable, + GError **error); + +static void soup_input_stream_got_headers (SoupMessage *msg, gpointer stream); +static void soup_input_stream_got_chunk (SoupMessage *msg, SoupBuffer *chunk, gpointer stream); +static void soup_input_stream_finished (SoupMessage *msg, gpointer stream); + +static void +soup_input_stream_finalize (GObject *object) +{ + SoupInputStream *stream = SOUP_INPUT_STREAM (object); + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + + g_object_unref (priv->session); + + g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_input_stream_got_headers), stream); + g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_input_stream_got_chunk), stream); + g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_input_stream_finished), stream); + g_object_unref (priv->msg); + g_free (priv->leftover_buffer); + + if (G_OBJECT_CLASS (soup_input_stream_parent_class)->finalize) + (*G_OBJECT_CLASS (soup_input_stream_parent_class)->finalize) (object); +} + +static void +soup_input_stream_class_init (SoupInputStreamClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass); + + g_type_class_add_private (klass, sizeof (SoupInputStreamPrivate)); + + gobject_class->finalize = soup_input_stream_finalize; + + stream_class->read_fn = soup_input_stream_read; + stream_class->close_fn = soup_input_stream_close; + stream_class->read_async = soup_input_stream_read_async; + stream_class->read_finish = soup_input_stream_read_finish; + stream_class->close_async = soup_input_stream_close_async; + stream_class->close_finish = soup_input_stream_close_finish; +} + +static void +soup_input_stream_seekable_iface_init (GSeekableIface *seekable_iface) +{ + seekable_iface->tell = soup_input_stream_tell; + seekable_iface->can_seek = soup_input_stream_can_seek; + seekable_iface->seek = soup_input_stream_seek; + seekable_iface->can_truncate = soup_input_stream_can_truncate; + seekable_iface->truncate_fn = soup_input_stream_truncate; +} + +static void +soup_input_stream_init (SoupInputStream *stream) +{ + ; +} + +static void +soup_input_stream_queue_message (SoupInputStream *stream) +{ + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + + priv->got_headers = priv->finished = FALSE; + + /* Add an extra ref since soup_session_queue_message steals one */ + g_object_ref (priv->msg); + soup_session_queue_message (priv->session, priv->msg, NULL, NULL); +} + +/** + * soup_input_stream_new: + * @session: the #SoupSession to use + * @msg: the #SoupMessage whose response will be streamed + * + * Prepares to send @msg over @session, and returns a #GInputStream + * that can be used to read the response. + * + * @msg may not be sent until the first read call; if you need to look + * at the status code or response headers before reading the body, you + * can use soup_input_stream_send() or soup_input_stream_send_async() + * to force the message to be sent and the response headers read. + * + * If @msg gets a non-2xx result, the first read (or send) will return + * an error with type %SOUP_INPUT_STREAM_HTTP_ERROR. + * + * Internally, #SoupInputStream is implemented using asynchronous I/O, + * so if you are using the synchronous API (eg, + * g_input_stream_read()), you should create a new #GMainContext and + * set it as the %SOUP_SESSION_ASYNC_CONTEXT property on @session. (If + * you don't, then synchronous #GInputStream calls will cause the main + * loop to be run recursively.) The async #GInputStream API works fine + * with %SOUP_SESSION_ASYNC_CONTEXT either set or unset. + * + * Returns: a new #GInputStream. + **/ +GInputStream * +soup_input_stream_new (SoupSession *session, SoupMessage *msg) +{ + SoupInputStream *stream; + SoupInputStreamPrivate *priv; + + g_return_val_if_fail (SOUP_IS_MESSAGE (msg), NULL); + + stream = g_object_new (SOUP_TYPE_INPUT_STREAM, NULL); + priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + + priv->session = g_object_ref (session); + priv->async_context = soup_session_get_async_context (session); + priv->msg = g_object_ref (msg); + + g_signal_connect (msg, "got_headers", + G_CALLBACK (soup_input_stream_got_headers), stream); + g_signal_connect (msg, "got_chunk", + G_CALLBACK (soup_input_stream_got_chunk), stream); + g_signal_connect (msg, "finished", + G_CALLBACK (soup_input_stream_finished), stream); + + soup_input_stream_queue_message (stream); + return G_INPUT_STREAM (stream); +} + +static void +soup_input_stream_got_headers (SoupMessage *msg, gpointer stream) +{ + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + + /* If the status is unsuccessful, we just ignore the signal and let + * libsoup keep going (eventually either it will requeue the request + * (after handling authentication/redirection), or else the + * "finished" handler will run). + */ + if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) + return; + + priv->got_headers = TRUE; + if (!priv->caller_buffer) + { + /* Not ready to read the body yet */ + soup_session_pause_message (priv->session, msg); + } + + if (priv->got_headers_cb) + priv->got_headers_cb (stream); +} + +static void +soup_input_stream_got_chunk (SoupMessage *msg, SoupBuffer *chunk_buffer, + gpointer stream) +{ + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + const gchar *chunk = chunk_buffer->data; + gsize chunk_size = chunk_buffer->length; + + /* We only pay attention to the chunk if it's part of a successful + * response. + */ + if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) + return; + + /* Sanity check */ + if (priv->caller_bufsize == 0 || priv->leftover_bufsize != 0) + g_warning ("soup_input_stream_got_chunk called again before previous chunk was processed"); + + /* Copy what we can into priv->caller_buffer */ + if (priv->caller_bufsize - priv->caller_nread > 0) + { + gsize nread = MIN (chunk_size, priv->caller_bufsize - priv->caller_nread); + + memcpy (priv->caller_buffer + priv->caller_nread, chunk, nread); + priv->caller_nread += nread; + priv->offset += nread; + chunk += nread; + chunk_size -= nread; + } + + if (chunk_size > 0) + { + /* Copy the rest into priv->leftover_buffer. If there's already + * some data there, realloc and append. Otherwise just copy. + */ + if (priv->leftover_bufsize) + { + priv->leftover_buffer = g_realloc (priv->leftover_buffer, + priv->leftover_bufsize + chunk_size); + memcpy (priv->leftover_buffer + priv->leftover_bufsize, + chunk, chunk_size); + priv->leftover_bufsize += chunk_size; + } + else + { + priv->leftover_bufsize = chunk_size; + priv->leftover_buffer = g_memdup (chunk, chunk_size); + priv->leftover_offset = 0; + } + } + + soup_session_pause_message (priv->session, msg); + if (priv->got_chunk_cb) + priv->got_chunk_cb (stream); +} + +static void +soup_input_stream_finished (SoupMessage *msg, gpointer stream) +{ + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + + priv->finished = TRUE; + + if (priv->finished_cb) + priv->finished_cb (stream); +} + +static gboolean +soup_input_stream_cancelled (GIOChannel *chan, GIOCondition condition, + gpointer stream) +{ + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + + priv->cancel_watch = NULL; + + soup_session_pause_message (priv->session, priv->msg); + if (priv->cancelled_cb) + priv->cancelled_cb (stream); + + return FALSE; +} + +static void +soup_input_stream_prepare_for_io (GInputStream *stream, + GCancellable *cancellable, + guchar *buffer, + gsize count) +{ + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + int cancel_fd; + + priv->cancellable = cancellable; + cancel_fd = g_cancellable_get_fd (cancellable); + if (cancel_fd != -1) + { + GIOChannel *chan = g_io_channel_unix_new (cancel_fd); + priv->cancel_watch = soup_add_io_watch (priv->async_context, chan, + G_IO_IN | G_IO_ERR | G_IO_HUP, + soup_input_stream_cancelled, + stream); + g_io_channel_unref (chan); + } + + priv->caller_buffer = buffer; + priv->caller_bufsize = count; + priv->caller_nread = 0; + + if (priv->got_headers) + soup_session_unpause_message (priv->session, priv->msg); +} + +static void +soup_input_stream_done_io (GInputStream *stream) +{ + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + + if (priv->cancel_watch) + { + g_source_destroy (priv->cancel_watch); + priv->cancel_watch = NULL; + } + priv->cancellable = NULL; + + priv->caller_buffer = NULL; + priv->caller_bufsize = 0; +} + +static gboolean +set_error_if_http_failed (SoupMessage *msg, GError **error) +{ + if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) + { + g_set_error_literal (error, SOUP_HTTP_ERROR, + msg->status_code, msg->reason_phrase); + return TRUE; + } + return FALSE; +} + +static gsize +read_from_leftover (SoupInputStreamPrivate *priv, + gpointer buffer, gsize bufsize) +{ + gsize nread; + + if (priv->leftover_bufsize - priv->leftover_offset <= bufsize) + { + nread = priv->leftover_bufsize - priv->leftover_offset; + memcpy (buffer, priv->leftover_buffer + priv->leftover_offset, nread); + + g_free (priv->leftover_buffer); + priv->leftover_buffer = NULL; + priv->leftover_bufsize = priv->leftover_offset = 0; + } + else + { + nread = bufsize; + memcpy (buffer, priv->leftover_buffer + priv->leftover_offset, nread); + priv->leftover_offset += nread; + } + + priv->offset += nread; + return nread; +} + +/* This does the work of soup_input_stream_send(), assuming that the + * GInputStream pending flag has already been set. It is also used by + * soup_input_stream_send_async() in some circumstances. + */ +static gboolean +soup_input_stream_send_internal (GInputStream *stream, + GCancellable *cancellable, + GError **error) +{ + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + + soup_input_stream_prepare_for_io (stream, cancellable, NULL, 0); + while (!priv->finished && !priv->got_headers && + !g_cancellable_is_cancelled (cancellable)) + g_main_context_iteration (priv->async_context, TRUE); + soup_input_stream_done_io (stream); + + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return FALSE; + else if (set_error_if_http_failed (priv->msg, error)) + return FALSE; + return TRUE; +} + +/** + * soup_input_stream_send: + * @stream: a #SoupInputStream + * @cancellable: optional #GCancellable object, %NULL to ignore. + * @error: location to store the error occuring, or %NULL to ignore + * + * Synchronously sends the HTTP request associated with @stream, and + * reads the response headers. Call this after soup_input_stream_new() + * and before the first g_input_stream_read() if you want to check the + * HTTP status code before you start reading. + * + * Return value: %TRUE if msg has a successful (2xx) status, %FALSE if + * not. + **/ +gboolean +soup_input_stream_send (GInputStream *stream, + GCancellable *cancellable, + GError **error) +{ + gboolean result; + + g_return_val_if_fail (SOUP_IS_INPUT_STREAM (stream), FALSE); + + if (!g_input_stream_set_pending (stream, error)) + return FALSE; + result = soup_input_stream_send_internal (stream, cancellable, error); + g_input_stream_clear_pending (stream); + + return result; +} + +static gssize +soup_input_stream_read (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + + if (priv->finished) + return 0; + + /* If there is data leftover from a previous read, return it. */ + if (priv->leftover_bufsize) + return read_from_leftover (priv, buffer, count); + + /* No leftover data, accept one chunk from the network */ + soup_input_stream_prepare_for_io (stream, cancellable, buffer, count); + while (!priv->finished && priv->caller_nread == 0 && + !g_cancellable_is_cancelled (cancellable)) + g_main_context_iteration (priv->async_context, TRUE); + soup_input_stream_done_io (stream); + + if (priv->caller_nread > 0) + return priv->caller_nread; + + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return -1; + else if (set_error_if_http_failed (priv->msg, error)) + return -1; + else + return 0; +} + +static gboolean +soup_input_stream_close (GInputStream *stream, + GCancellable *cancellable, + GError **error) +{ + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + + if (!priv->finished) + soup_session_cancel_message (priv->session, priv->msg, SOUP_STATUS_CANCELLED); + + return TRUE; +} + +static void +wrapper_callback (GObject *source_object, GAsyncResult *res, + gpointer user_data) +{ + GInputStream *stream = G_INPUT_STREAM (source_object); + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + + g_input_stream_clear_pending (stream); + if (priv->outstanding_callback) + (*priv->outstanding_callback) (source_object, res, user_data); + priv->outstanding_callback = NULL; + g_object_unref (stream); +} + +static void +send_async_thread (GSimpleAsyncResult *res, + GObject *object, + GCancellable *cancellable) +{ + GError *error = NULL; + gboolean success; + + success = soup_input_stream_send_internal (G_INPUT_STREAM (object), + cancellable, &error); + g_simple_async_result_set_op_res_gboolean (res, success); + if (error) + { + g_simple_async_result_set_from_error (res, error); + g_error_free (error); + } +} + +static void +soup_input_stream_send_async_in_thread (GInputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GSimpleAsyncResult *res; + + res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, + soup_input_stream_send_async_in_thread); + g_simple_async_result_run_in_thread (res, send_async_thread, + io_priority, cancellable); + g_object_unref (res); +} + +static void +send_async_finished (GInputStream *stream) +{ + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + GSimpleAsyncResult *result; + GError *error = NULL; + + if (!g_cancellable_set_error_if_cancelled (priv->cancellable, &error)) + set_error_if_http_failed (priv->msg, &error); + + priv->got_headers_cb = NULL; + priv->finished_cb = NULL; + soup_input_stream_done_io (stream); + + result = priv->result; + priv->result = NULL; + + g_simple_async_result_set_op_res_gboolean (result, error == NULL); + if (error) + { + g_simple_async_result_set_from_error (result, error); + g_error_free (error); + } + g_simple_async_result_complete (result); +} + +static void +soup_input_stream_send_async_internal (GInputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + + g_object_ref (stream); + priv->outstanding_callback = callback; + + /* If the session uses the default GMainContext, then we can do + * async I/O directly. But if it has its own main context, it's + * easier to just run it in another thread. + */ + if (soup_session_get_async_context (priv->session)) + { + soup_input_stream_send_async_in_thread (stream, io_priority, cancellable, + wrapper_callback, user_data); + return; + } + + priv->got_headers_cb = send_async_finished; + priv->finished_cb = send_async_finished; + + soup_input_stream_prepare_for_io (stream, cancellable, NULL, 0); + priv->result = g_simple_async_result_new (G_OBJECT (stream), + wrapper_callback, user_data, + soup_input_stream_send_async); +} + +/** + * soup_input_stream_send_async: + * @stream: a #SoupInputStream + * @io_priority: the io priority of the request. + * @cancellable: optional #GCancellable object, %NULL to ignore. + * @callback: callback to call when the request is satisfied + * @user_data: the data to pass to callback function + * + * Asynchronously sends the HTTP request associated with @stream, and + * reads the response headers. Call this after soup_input_stream_new() + * and before the first g_input_stream_read_async() if you want to + * check the HTTP status code before you start reading. + **/ +void +soup_input_stream_send_async (GInputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GError *error = NULL; + + g_return_if_fail (SOUP_IS_INPUT_STREAM (stream)); + + if (!g_input_stream_set_pending (stream, &error)) + { + g_simple_async_report_gerror_in_idle (G_OBJECT (stream), + callback, + user_data, + error); + g_error_free (error); + return; + } + soup_input_stream_send_async_internal (stream, io_priority, cancellable, + callback, user_data); +} + +/** + * soup_input_stream_send_finish: + * @stream: a #SoupInputStream + * @result: a #GAsyncResult. + * @error: a #GError location to store the error occuring, or %NULL to + * ignore. + * + * Finishes a soup_input_stream_send_async() operation. + * + * Return value: %TRUE if the message was sent successfully and + * received a successful status code, %FALSE if not. + **/ +gboolean +soup_input_stream_send_finish (GInputStream *stream, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple; + + g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), FALSE); + simple = G_SIMPLE_ASYNC_RESULT (result); + + g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_input_stream_send_async, FALSE); + + if (g_simple_async_result_propagate_error (simple, error)) + return FALSE; + + return g_simple_async_result_get_op_res_gboolean (simple); +} + +static void +read_async_done (GInputStream *stream) +{ + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + GSimpleAsyncResult *result; + GError *error = NULL; + + result = priv->result; + priv->result = NULL; + + if (g_cancellable_set_error_if_cancelled (priv->cancellable, &error) || + set_error_if_http_failed (priv->msg, &error)) + { + g_simple_async_result_set_from_error (result, error); + g_error_free (error); + } + else + g_simple_async_result_set_op_res_gssize (result, priv->caller_nread); + + priv->got_chunk_cb = NULL; + priv->finished_cb = NULL; + priv->cancelled_cb = NULL; + soup_input_stream_done_io (stream); + + g_simple_async_result_complete (result); + g_object_unref (result); +} + +static void +soup_input_stream_read_async (GInputStream *stream, + void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + GSimpleAsyncResult *result; + + /* If the session uses the default GMainContext, then we can do + * async I/O directly. But if it has its own main context, we fall + * back to the async-via-sync-in-another-thread implementation. + */ + if (soup_session_get_async_context (priv->session)) + { + G_INPUT_STREAM_CLASS (soup_input_stream_parent_class)-> + read_async (stream, buffer, count, io_priority, + cancellable, callback, user_data); + return; + } + + result = g_simple_async_result_new (G_OBJECT (stream), + callback, user_data, + soup_input_stream_read_async); + + if (priv->finished) + { + g_simple_async_result_set_op_res_gssize (result, 0); + g_simple_async_result_complete_in_idle (result); + g_object_unref (result); + return; + } + + if (priv->leftover_bufsize) + { + gsize nread = read_from_leftover (priv, buffer, count); + g_simple_async_result_set_op_res_gssize (result, nread); + g_simple_async_result_complete_in_idle (result); + g_object_unref (result); + return; + } + + priv->result = result; + + priv->got_chunk_cb = read_async_done; + priv->finished_cb = read_async_done; + priv->cancelled_cb = read_async_done; + soup_input_stream_prepare_for_io (stream, cancellable, buffer, count); +} + +static gssize +soup_input_stream_read_finish (GInputStream *stream, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple; + + g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), -1); + simple = G_SIMPLE_ASYNC_RESULT (result); + g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_input_stream_read_async, -1); + + return g_simple_async_result_get_op_res_gssize (simple); +} + +static void +soup_input_stream_close_async (GInputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GSimpleAsyncResult *result; + gboolean success; + GError *error = NULL; + + result = g_simple_async_result_new (G_OBJECT (stream), + callback, user_data, + soup_input_stream_close_async); + success = soup_input_stream_close (stream, cancellable, &error); + g_simple_async_result_set_op_res_gboolean (result, success); + if (error) + { + g_simple_async_result_set_from_error (result, error); + g_error_free (error); + } + + g_simple_async_result_complete_in_idle (result); + g_object_unref (result); +} + +static gboolean +soup_input_stream_close_finish (GInputStream *stream, + GAsyncResult *result, + GError **error) +{ + /* Failures handled in generic close_finish code */ + return TRUE; +} + +static goffset +soup_input_stream_tell (GSeekable *seekable) +{ + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (seekable); + + return priv->offset; +} + +static gboolean +soup_input_stream_can_seek (GSeekable *seekable) +{ + return TRUE; +} + +extern void soup_message_io_cleanup (SoupMessage *msg); + +static gboolean +soup_input_stream_seek (GSeekable *seekable, + goffset offset, + GSeekType type, + GCancellable *cancellable, + GError **error) +{ + GInputStream *stream = G_INPUT_STREAM (seekable); + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (seekable); + char *range; + + if (type == G_SEEK_END) + { + /* FIXME: we could send "bytes=-offset", but unless we know the + * Content-Length, we wouldn't be able to answer a tell() properly. + * We could find the Content-Length by doing a HEAD... + */ + + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, + "G_SEEK_END not currently supported"); + return FALSE; + } + + if (!g_input_stream_set_pending (stream, error)) + return FALSE; + + soup_session_cancel_message (priv->session, priv->msg, SOUP_STATUS_CANCELLED); + soup_message_io_cleanup (priv->msg); + + switch (type) + { + case G_SEEK_CUR: + offset += priv->offset; + /* fall through */ + + case G_SEEK_SET: + range = g_strdup_printf ("bytes=%"G_GUINT64_FORMAT"-", (guint64)offset); + priv->offset = offset; + break; + + case G_SEEK_END: + range = NULL; /* keep compilers happy */ + g_return_val_if_reached (FALSE); + break; + + default: + g_return_val_if_reached (FALSE); + } + + soup_message_headers_remove (priv->msg->request_headers, "Range"); + soup_message_headers_append (priv->msg->request_headers, "Range", range); + g_free (range); + + soup_input_stream_queue_message (SOUP_INPUT_STREAM (stream)); + + g_input_stream_clear_pending (stream); + return TRUE; +} + +static gboolean +soup_input_stream_can_truncate (GSeekable *seekable) +{ + return FALSE; +} + +static gboolean +soup_input_stream_truncate (GSeekable *seekable, + goffset offset, + GCancellable *cancellable, + GError **error) +{ + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, + "Truncate not allowed on input stream"); + return FALSE; +} + +GQuark +soup_http_error_quark (void) +{ + static GQuark error; + if (!error) + error = g_quark_from_static_string ("soup_http_error_quark"); + return error; +} |