summaryrefslogtreecommitdiff
path: root/libsoup/soup-message-io.c
diff options
context:
space:
mode:
authorDan Winship <danw@gnome.org>2012-01-26 16:25:57 -0500
committerDan Winship <danw@gnome.org>2012-04-17 21:26:26 -0400
commit9effb5ca942412ecde9242c745f2df6da80853a3 (patch)
tree335e8db3779f533cae4bad15cb39c6c9996e27c1 /libsoup/soup-message-io.c
parent3f1180b9453899464b0ae49515cddeb8ae3abc7f (diff)
downloadlibsoup-9effb5ca942412ecde9242c745f2df6da80853a3.tar.gz
SoupHTTPRequest: O brave new world!
Kill SoupHTTPInputStream, and have SoupHTTPRequest return the message's body_istream directly (with a little help from SoupSession and its subclasses). SoupHTTPRequest works synchronously now as well (though it's still the case that async only works with SoupSessionAsync and sync only works with SoupSessionSync). https://bugzilla.gnome.org/show_bug.cgi?id=591739
Diffstat (limited to 'libsoup/soup-message-io.c')
-rw-r--r--libsoup/soup-message-io.c310
1 files changed, 248 insertions, 62 deletions
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index e71a8adf..4e5c35f7 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -12,8 +12,11 @@
#include <stdlib.h>
#include <string.h>
+#include <glib/gi18n-lib.h>
+
#include "soup-body-input-stream.h"
#include "soup-body-output-stream.h"
+#include "soup-client-input-stream.h"
#include "soup-connection.h"
#include "soup-content-sniffer-stream.h"
#include "soup-converter-wrapper.h"
@@ -31,6 +34,7 @@ typedef enum {
typedef enum {
SOUP_MESSAGE_IO_STATE_NOT_STARTED,
+ SOUP_MESSAGE_IO_STATE_ANY = SOUP_MESSAGE_IO_STATE_NOT_STARTED,
SOUP_MESSAGE_IO_STATE_HEADERS,
SOUP_MESSAGE_IO_STATE_BLOCKING,
SOUP_MESSAGE_IO_STATE_BODY_START,
@@ -45,6 +49,9 @@ typedef enum {
(state != SOUP_MESSAGE_IO_STATE_NOT_STARTED && \
state != SOUP_MESSAGE_IO_STATE_BLOCKING && \
state != SOUP_MESSAGE_IO_STATE_DONE)
+#define SOUP_MESSAGE_IO_STATE_POLLABLE(state) \
+ (SOUP_MESSAGE_IO_STATE_ACTIVE (state) && \
+ state != SOUP_MESSAGE_IO_STATE_BODY_DONE)
typedef struct {
SoupMessageQueueItem *item;
@@ -136,11 +143,13 @@ soup_message_io_stop (SoupMessage *msg)
if (io->io_source) {
g_source_destroy (io->io_source);
+ g_source_unref (io->io_source);
io->io_source = NULL;
}
if (io->unpause_source) {
g_source_destroy (io->unpause_source);
+ g_source_unref (io->unpause_source);
io->unpause_source = NULL;
}
@@ -215,8 +224,12 @@ read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
&got_lf,
cancellable, error);
io->read_header_buf->len = old_len + MAX (nread, 0);
- if (nread == 0)
- io_error (io->sock, msg, NULL);
+ if (nread == 0) {
+ soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
+ g_set_error_literal (error, G_IO_ERROR,
+ G_IO_ERROR_PARTIAL_INPUT,
+ _("Connection terminated unexpectedly"));
+ }
if (nread <= 0)
return FALSE;
@@ -257,9 +270,10 @@ setup_body_istream (SoupMessage *msg)
GInputStream *filter;
GSList *d;
- io->body_istream = soup_body_input_stream_new (io->istream,
- io->read_encoding,
- io->read_length);
+ io->body_istream =
+ soup_body_input_stream_new (io->istream,
+ io->read_encoding,
+ io->read_length);
for (d = priv->decoders; d; d = d->next) {
decoder = d->data;
@@ -321,11 +335,6 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
gssize nwrote;
switch (io->write_state) {
- case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
- case SOUP_MESSAGE_IO_STATE_BLOCKING:
- return FALSE;
-
-
case SOUP_MESSAGE_IO_STATE_HEADERS:
if (!io->write_buf->len) {
io->get_headers_cb (msg, io->write_buf,
@@ -416,6 +425,7 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
if (!io->write_chunk) {
io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
if (!io->write_chunk) {
+ g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
soup_message_io_pause (msg);
return FALSE;
}
@@ -486,7 +496,6 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
break;
- case SOUP_MESSAGE_IO_STATE_DONE:
default:
g_return_val_if_reached (FALSE);
}
@@ -511,11 +520,6 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
guint status;
switch (io->read_state) {
- case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
- case SOUP_MESSAGE_IO_STATE_BLOCKING:
- return FALSE;
-
-
case SOUP_MESSAGE_IO_STATE_HEADERS:
if (!read_headers (msg, cancellable, error))
return FALSE;
@@ -628,6 +632,7 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
if (priv->chunk_allocator) {
buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
if (!buffer) {
+ g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
soup_message_io_pause (msg);
return FALSE;
}
@@ -675,7 +680,6 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
break;
- case SOUP_MESSAGE_IO_STATE_DONE:
default:
g_return_val_if_reached (FALSE);
}
@@ -683,43 +687,160 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
return TRUE;
}
-static GSource *
+typedef struct {
+ GSource source;
+ SoupMessage *msg;
+} SoupMessageSource;
+
+static gboolean
+message_source_prepare (GSource *source,
+ gint *timeout)
+{
+ *timeout = -1;
+ return FALSE;
+}
+
+static gboolean
+message_source_check (GSource *source)
+{
+ return FALSE;
+}
+
+static gboolean
+message_source_dispatch (GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ SoupMessageSourceFunc func = (SoupMessageSourceFunc)callback;
+ SoupMessageSource *message_source = (SoupMessageSource *)source;
+
+ return (*func) (message_source->msg, user_data);
+}
+
+static void
+message_source_finalize (GSource *source)
+{
+ SoupMessageSource *message_source = (SoupMessageSource *)source;
+
+ g_object_unref (message_source->msg);
+}
+
+static gboolean
+message_source_closure_callback (SoupMessage *msg,
+ gpointer data)
+{
+ GClosure *closure = data;
+
+ GValue param = G_VALUE_INIT;
+ GValue result_value = G_VALUE_INIT;
+ gboolean result;
+
+ g_value_init (&result_value, G_TYPE_BOOLEAN);
+
+ g_value_init (&param, SOUP_TYPE_MESSAGE);
+ g_value_set_object (&param, msg);
+
+ g_closure_invoke (closure, &result_value, 1, &param, NULL);
+
+ result = g_value_get_boolean (&result_value);
+ g_value_unset (&result_value);
+ g_value_unset (&param);
+
+ return result;
+}
+
+static GSourceFuncs message_source_funcs =
+{
+ message_source_prepare,
+ message_source_check,
+ message_source_dispatch,
+ message_source_finalize,
+ (GSourceFunc)message_source_closure_callback,
+ (GSourceDummyMarshal)g_cclosure_marshal_generic,
+};
+
+GSource *
soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
- GSourceFunc callback, gpointer user_data)
+ SoupMessageSourceFunc callback, gpointer user_data)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
- GSource *source;
-
- if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) {
- source = g_pollable_input_stream_create_source (
- G_POLLABLE_INPUT_STREAM (io->istream), cancellable);
- } else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) {
- source = g_pollable_output_stream_create_source (
- G_POLLABLE_OUTPUT_STREAM (io->ostream), cancellable);
+ GSource *base_source, *source;
+ SoupMessageSource *message_source;
+
+ if (io && SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
+ GPollableInputStream *istream;
+
+ if (io->body_istream)
+ istream = G_POLLABLE_INPUT_STREAM (io->body_istream);
+ else
+ istream = G_POLLABLE_INPUT_STREAM (io->istream);
+ base_source = g_pollable_input_stream_create_source (istream, cancellable);
+ } else if (io && SOUP_MESSAGE_IO_STATE_POLLABLE (io->write_state)) {
+ GPollableOutputStream *ostream;
+
+ if (io->body_ostream)
+ ostream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
+ else
+ ostream = G_POLLABLE_OUTPUT_STREAM (io->ostream);
+ base_source = g_pollable_output_stream_create_source (ostream, cancellable);
} else
- g_return_val_if_reached (NULL);
-
- g_source_set_callback (source, callback, user_data, NULL);
+ base_source = g_timeout_source_new (0);
+
+ g_source_set_dummy_callback (base_source);
+ source = g_source_new (&message_source_funcs,
+ sizeof (SoupMessageSource));
+ g_source_set_name (source, "SoupMessageSource");
+ message_source = (SoupMessageSource *)source;
+ message_source->msg = g_object_ref (msg);
+
+ g_source_add_child_source (source, base_source);
+ g_source_unref (base_source);
+ g_source_set_callback (source, (GSourceFunc) callback, user_data, NULL);
return source;
}
-static gboolean io_run (GObject *stream, SoupMessage *msg);
-
-static void
-setup_io_source (SoupMessage *msg)
+static gboolean
+io_run_until (SoupMessage *msg,
+ SoupMessageIOState read_state, SoupMessageIOState write_state,
+ GCancellable *cancellable, GError **error)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
+ gboolean progress = TRUE, done;
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return FALSE;
+ else if (!io) {
+ g_set_error_literal (error, G_IO_ERROR,
+ G_IO_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ return FALSE;
+ }
+
+ g_object_ref (msg);
- io->io_source = soup_message_io_get_source (msg, NULL,
- (GSourceFunc)io_run, msg);
- g_source_attach (io->io_source, io->async_context);
- g_source_unref (io->io_source);
+ while (progress && priv->io_data == io && !io->paused &&
+ (io->read_state < read_state || io->write_state < write_state)) {
+
+ if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
+ progress = io_read (msg, cancellable, error);
+ else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
+ progress = io_write (msg, cancellable, error);
+ else
+ progress = FALSE;
+ }
+
+ done = (priv->io_data == io &&
+ io->read_state >= read_state &&
+ io->write_state >= write_state);
+
+ g_object_unref (msg);
+ return done;
}
static gboolean
-io_run (GObject *stream, SoupMessage *msg)
+io_run (SoupMessage *msg, gpointer user_data)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
@@ -727,36 +848,99 @@ io_run (GObject *stream, SoupMessage *msg)
if (io->io_source) {
g_source_destroy (io->io_source);
+ g_source_unref (io->io_source);
io->io_source = NULL;
}
g_object_ref (msg);
- while (priv->io_data == io && !io->paused) {
- gboolean progress = FALSE;
+ if (io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ io->cancellable, &error)) {
+ soup_message_io_finished (msg);
+ } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_clear_error (&error);
+ io->io_source = soup_message_io_get_source (msg, NULL, io_run, msg);
+ g_source_attach (io->io_source, io->async_context);
+ } else if (error) {
+ io_error (io->sock, msg, error);
+ }
- if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
- progress = io_read (msg, io->cancellable, &error);
- else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
- progress = io_write (msg, io->cancellable, &error);
+ g_object_unref (msg);
+ return FALSE;
+}
- if (!progress)
- break;
- }
+gboolean
+soup_message_io_run_until_write (SoupMessage *msg,
+ GCancellable *cancellable, GError **error)
+{
+ return io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_ANY,
+ SOUP_MESSAGE_IO_STATE_BODY,
+ cancellable, error);
+}
- if (error) {
- if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- g_clear_error (&error);
- setup_io_source (msg);
- } else
- io_error (io->sock, msg, error);
- } else if (priv->io_data == io &&
- io->read_state == SOUP_MESSAGE_IO_STATE_DONE &&
- io->write_state == SOUP_MESSAGE_IO_STATE_DONE)
- soup_message_io_finished (msg);
+gboolean
+soup_message_io_run_until_read (SoupMessage *msg,
+ GCancellable *cancellable, GError **error)
+{
+ return io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_BODY,
+ SOUP_MESSAGE_IO_STATE_ANY,
+ cancellable, error);
+}
+gboolean
+soup_message_io_run_until_finish (SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error)
+{
+ g_object_ref (msg);
+
+ if (!io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ cancellable, error))
+ return FALSE;
+
+ soup_message_io_finished (msg);
g_object_unref (msg);
- return FALSE;
+ return TRUE;
+}
+
+static void
+client_stream_eof (SoupClientInputStream *stream, gpointer user_data)
+{
+ SoupMessage *msg = user_data;
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+
+ if (io && io->read_state == SOUP_MESSAGE_IO_STATE_BODY)
+ io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
+}
+
+GInputStream *
+soup_message_io_get_response_istream (SoupMessage *msg,
+ GError **error)
+{
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+ GInputStream *client_stream;
+
+ g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, NULL);
+
+ if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
+ g_set_error_literal (error, SOUP_HTTP_ERROR,
+ msg->status_code, msg->reason_phrase);
+ return NULL;
+ }
+
+ client_stream = soup_client_input_stream_new (io->body_istream, msg);
+ g_signal_connect (client_stream, "eof",
+ G_CALLBACK (client_stream_eof), msg);
+
+ return client_stream;
}
@@ -839,7 +1023,8 @@ soup_message_io_client (SoupMessageQueueItem *item,
io->write_body = item->msg->request_body;
io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_run (NULL, item->msg);
+ if (!item->new_api)
+ io_run (item->msg, NULL);
}
void
@@ -860,7 +1045,7 @@ soup_message_io_server (SoupMessage *msg, SoupSocket *sock,
io->write_body = msg->response_body;
io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_run (NULL, msg);
+ io_run (msg, NULL);
}
void
@@ -873,6 +1058,7 @@ soup_message_io_pause (SoupMessage *msg)
if (io->io_source) {
g_source_destroy (io->io_source);
+ g_source_unref (io->io_source);
io->io_source = NULL;
}
@@ -897,7 +1083,7 @@ io_unpause_internal (gpointer msg)
if (io->io_source)
return FALSE;
- io_run (NULL, msg);
+ io_run (msg, NULL);
return FALSE;
}