diff options
Diffstat (limited to 'trunk/daemon/soup-output-stream.c')
-rw-r--r-- | trunk/daemon/soup-output-stream.c | 417 |
1 files changed, 417 insertions, 0 deletions
diff --git a/trunk/daemon/soup-output-stream.c b/trunk/daemon/soup-output-stream.c new file mode 100644 index 00000000..86ff8a43 --- /dev/null +++ b/trunk/daemon/soup-output-stream.c @@ -0,0 +1,417 @@ +/* soup-output-stream.c, based on gunixoutputstream.c + * + * Copyright (C) 2006-2008 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General + * Public License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include <config.h> + +#include <string.h> + +#include <glib.h> +#include <gio/gio.h> + +#include <libsoup/soup.h> + +#include "soup-output-stream.h" +#include "soup-input-stream.h" + +G_DEFINE_TYPE (SoupOutputStream, soup_output_stream, G_TYPE_OUTPUT_STREAM) + +typedef void (*SoupOutputStreamCallback) (GOutputStream *); + +typedef struct { + SoupSession *session; + GMainContext *async_context; + SoupMessage *msg; + gboolean finished; + + goffset size, offset; + GByteArray *ba; + + GCancellable *cancellable; + GSource *cancel_watch; + SoupOutputStreamCallback finished_cb; + SoupOutputStreamCallback cancelled_cb; + + GSimpleAsyncResult *result; +} SoupOutputStreamPrivate; +#define SOUP_OUTPUT_STREAM_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_OUTPUT_STREAM, SoupOutputStreamPrivate)) + +static gssize soup_output_stream_write (GOutputStream *stream, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error); +static gboolean soup_output_stream_close (GOutputStream *stream, + GCancellable *cancellable, + GError **error); +static void soup_output_stream_write_async (GOutputStream *stream, + const void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gssize soup_output_stream_write_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error); +static void soup_output_stream_close_async (GOutputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gboolean soup_output_stream_close_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error); + +static void soup_output_stream_finished (SoupMessage *msg, gpointer stream); + +static void +soup_output_stream_finalize (GObject *object) +{ + SoupOutputStreamPrivate *priv = SOUP_OUTPUT_STREAM_GET_PRIVATE (object); + + g_object_unref (priv->session); + + g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_output_stream_finished), object); + g_object_unref (priv->msg); + + if (priv->ba) + g_byte_array_free (priv->ba, TRUE); + + if (G_OBJECT_CLASS (soup_output_stream_parent_class)->finalize) + (*G_OBJECT_CLASS (soup_output_stream_parent_class)->finalize) (object); +} + +static void +soup_output_stream_class_init (SoupOutputStreamClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GOutputStreamClass *stream_class = G_OUTPUT_STREAM_CLASS (klass); + + g_type_class_add_private (klass, sizeof (SoupOutputStreamPrivate)); + + gobject_class->finalize = soup_output_stream_finalize; + + stream_class->write_fn = soup_output_stream_write; + stream_class->close_fn = soup_output_stream_close; + stream_class->write_async = soup_output_stream_write_async; + stream_class->write_finish = soup_output_stream_write_finish; + stream_class->close_async = soup_output_stream_close_async; + stream_class->close_finish = soup_output_stream_close_finish; +} + +static void +soup_output_stream_init (SoupOutputStream *stream) +{ + SoupOutputStreamPrivate *priv = SOUP_OUTPUT_STREAM_GET_PRIVATE (stream); + + priv->ba = g_byte_array_new (); +} + + +/** + * soup_output_stream_new: + * @session: the #SoupSession to use + * @msg: the #SoupMessage whose request will be streamed + * @size: the total size of the request body, or -1 if not known + * + * Prepares to send @msg over @session, and returns a #GOutputStream + * that can be used to write the response. The server's response will + * be available in @msg after calling soup_output_stream_close() + * (which will return a %SOUP_OUTPUT_STREAM_HTTP_ERROR #GError if the + * status is not 2xx). + * + * If you know the total number of bytes that will be written, pass + * that in @size. Otherwise, pass -1. (If you pass a size, you MUST + * write that many bytes to the stream; Trying to write more than + * that, or closing the stream without having written enough, will + * result in an error. + * + * In some situations, the request will not actually be sent until you + * call g_output_stream_close(). (In fact, currently this is *always* + * true.) + * + * Internally, #SoupOutputStream is implemented using asynchronous + * I/O, so if you are using the synchronous API (eg, + * g_output_stream_write()), you should create a new #GMainContext and + * set it as the %SOUP_SESSION_ASYNC_CONTEXT property on @session. (If + * you don't, then synchronous #GOutputStream calls will cause the + * main loop to be run recursively.) The async #GOutputStream API + * works fine with %SOUP_SESSION_ASYNC_CONTEXT either set or unset. + * + * Returns: a new #GOutputStream. + **/ +GOutputStream * +soup_output_stream_new (SoupSession *session, SoupMessage *msg, goffset size) +{ + SoupOutputStream *stream; + SoupOutputStreamPrivate *priv; + + g_return_val_if_fail (SOUP_IS_MESSAGE (msg), NULL); + + stream = g_object_new (SOUP_TYPE_OUTPUT_STREAM, NULL); + priv = SOUP_OUTPUT_STREAM_GET_PRIVATE (stream); + + priv->session = g_object_ref (session); + priv->async_context = soup_session_get_async_context (session); + priv->msg = g_object_ref (msg); + priv->size = size; + + return G_OUTPUT_STREAM (stream); +} + +static gboolean +soup_output_stream_cancelled (GIOChannel *chan, GIOCondition condition, + gpointer stream) +{ + SoupOutputStreamPrivate *priv = SOUP_OUTPUT_STREAM_GET_PRIVATE (stream); + + priv->cancel_watch = NULL; + + soup_session_pause_message (priv->session, priv->msg); + if (priv->cancelled_cb) + priv->cancelled_cb (stream); + + return FALSE; +} + +static void +soup_output_stream_prepare_for_io (GOutputStream *stream, GCancellable *cancellable) +{ + SoupOutputStreamPrivate *priv = SOUP_OUTPUT_STREAM_GET_PRIVATE (stream); + int cancel_fd; + + /* Move the buffer to the SoupMessage */ + soup_message_body_append (priv->msg->request_body, SOUP_MEMORY_TAKE, + priv->ba->data, priv->ba->len); + g_byte_array_free (priv->ba, FALSE); + priv->ba = NULL; + + /* Set up cancellation */ + priv->cancellable = cancellable; + cancel_fd = g_cancellable_get_fd (cancellable); + if (cancel_fd != -1) + { + GIOChannel *chan = g_io_channel_unix_new (cancel_fd); + priv->cancel_watch = soup_add_io_watch (priv->async_context, chan, + G_IO_IN | G_IO_ERR | G_IO_HUP, + soup_output_stream_cancelled, + stream); + g_io_channel_unref (chan); + } + + /* Add an extra ref since soup_session_queue_message steals one */ + g_object_ref (priv->msg); + soup_session_queue_message (priv->session, priv->msg, NULL, NULL); +} + +static void +soup_output_stream_done_io (GOutputStream *stream) +{ + SoupOutputStreamPrivate *priv = SOUP_OUTPUT_STREAM_GET_PRIVATE (stream); + + if (priv->cancel_watch) + { + g_source_destroy (priv->cancel_watch); + priv->cancel_watch = NULL; + } + priv->cancellable = NULL; +} + +static gboolean +set_error_if_http_failed (SoupMessage *msg, GError **error) +{ + if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) + { + g_set_error_literal (error, SOUP_HTTP_ERROR, + msg->status_code, msg->reason_phrase); + return TRUE; + } + return FALSE; +} + +static gssize +soup_output_stream_write (GOutputStream *stream, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + SoupOutputStreamPrivate *priv = SOUP_OUTPUT_STREAM_GET_PRIVATE (stream); + + if (priv->size > 0 && priv->offset + count > priv->size) { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NO_SPACE, + "Write would exceed caller-defined file size"); + return -1; + } + + g_byte_array_append (priv->ba, buffer, count); + priv->offset += count; + return count; +} + +static int +soup_output_stream_close (GOutputStream *stream, + GCancellable *cancellable, + GError **error) +{ + SoupOutputStreamPrivate *priv = SOUP_OUTPUT_STREAM_GET_PRIVATE (stream); + + if (priv->size > 0 && priv->offset != priv->size) { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NO_SPACE, + "File is incomplete"); + return -1; + } + + soup_output_stream_prepare_for_io (stream, cancellable); + while (!priv->finished && !g_cancellable_is_cancelled (cancellable)) + g_main_context_iteration (priv->async_context, TRUE); + soup_output_stream_done_io (stream); + + return !set_error_if_http_failed (priv->msg, error); +} + +static void +soup_output_stream_write_async (GOutputStream *stream, + const void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + SoupOutputStreamPrivate *priv = SOUP_OUTPUT_STREAM_GET_PRIVATE (stream); + GSimpleAsyncResult *result; + + result = g_simple_async_result_new (G_OBJECT (stream), + callback, user_data, + soup_output_stream_write_async); + + if (priv->size > 0 && priv->offset + count > priv->size) + { + GError *error; + + error = g_error_new (G_IO_ERROR, G_IO_ERROR_NO_SPACE, + "Write would exceed caller-defined file size"); + g_simple_async_result_set_from_error (result, error); + g_error_free (error); + } + else + { + g_byte_array_append (priv->ba, buffer, count); + priv->offset += count; + g_simple_async_result_set_op_res_gssize (result, count); + } + + g_simple_async_result_complete_in_idle (result); +} + +static gssize +soup_output_stream_write_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple; + gssize nwritten; + + simple = G_SIMPLE_ASYNC_RESULT (result); + g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == soup_output_stream_write_async); + + nwritten = g_simple_async_result_get_op_res_gssize (simple); + return nwritten; +} + +static void +close_async_done (GOutputStream *stream) +{ + SoupOutputStreamPrivate *priv = SOUP_OUTPUT_STREAM_GET_PRIVATE (stream); + GSimpleAsyncResult *result; + GError *error = NULL; + + result = priv->result; + priv->result = NULL; + + if (g_cancellable_set_error_if_cancelled (priv->cancellable, &error) || + set_error_if_http_failed (priv->msg, &error)) + { + g_simple_async_result_set_from_error (result, error); + g_error_free (error); + } + else + g_simple_async_result_set_op_res_gboolean (result, TRUE); + + priv->finished_cb = NULL; + priv->cancelled_cb = NULL; + soup_output_stream_done_io (stream); + + g_simple_async_result_complete (result); +} + +static void +soup_output_stream_finished (SoupMessage *msg, gpointer stream) +{ + SoupOutputStreamPrivate *priv = SOUP_OUTPUT_STREAM_GET_PRIVATE (stream); + + priv->finished = TRUE; + + g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_output_stream_finished), stream); + close_async_done (stream); +} + +static void +soup_output_stream_close_async (GOutputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + SoupOutputStreamPrivate *priv = SOUP_OUTPUT_STREAM_GET_PRIVATE (stream); + GSimpleAsyncResult *result; + + result = g_simple_async_result_new (G_OBJECT (stream), + callback, user_data, + soup_output_stream_close_async); + + if (priv->size > 0 && priv->offset != priv->size) + { + GError *error; + + error = g_error_new (G_IO_ERROR, G_IO_ERROR_NO_SPACE, + "File is incomplete"); + g_simple_async_result_set_from_error (result, error); + g_error_free (error); + g_simple_async_result_complete_in_idle (result); + return; + } + + priv->result = result; + priv->cancelled_cb = close_async_done; + g_signal_connect (priv->msg, "finished", + G_CALLBACK (soup_output_stream_finished), stream); + soup_output_stream_prepare_for_io (stream, cancellable); +} + +static gboolean +soup_output_stream_close_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error) +{ + /* Failures handled in generic close_finish code */ + return TRUE; +} |