summaryrefslogtreecommitdiff
path: root/libsoup/soup-message-io.c
diff options
context:
space:
mode:
authorDan Winship <danw@gnome.org>2014-06-09 08:52:38 -0400
committerDan Winship <danw@gnome.org>2014-11-02 11:27:12 -0500
commitadd13ea00e31f9f78d8dd4b26109575a7500dcf6 (patch)
treebc144797263ef6938bddd4af136aadf115613627 /libsoup/soup-message-io.c
parentaf73c4f92628576de71a94baf493e9f26632014b (diff)
downloadlibsoup-add13ea00e31f9f78d8dd4b26109575a7500dcf6.tar.gz
soup-message-io: do an async close when doing non-blocking I/O
When using chunked encoding, SoupBodyOutputStream needs to write the final "0" chunk when it's closed, and thus may block. So we have to do an async close in the non-blocking case. (This also requires changing continue-test to not trace "finished" events, since it becomes unpredicatable exactly when they'll happen now.) https://bugzilla.gnome.org/show_bug.cgi?id=727138
Diffstat (limited to 'libsoup/soup-message-io.c')
-rw-r--r--libsoup/soup-message-io.c71
1 files changed, 64 insertions, 7 deletions
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index 600cd85b..db98dc28 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -80,6 +80,9 @@ typedef struct {
GSource *unpause_source;
gboolean paused;
+ GCancellable *async_close_wait;
+ GError *async_close_error;
+
SoupMessageGetHeadersFn get_headers_cb;
SoupMessageParseHeadersFn parse_headers_cb;
gpointer header_data;
@@ -87,6 +90,7 @@ typedef struct {
gpointer completion_data;
} SoupMessageIOData;
+static void io_run (SoupMessage *msg, gboolean blocking);
#define RESPONSE_BLOCK_SIZE 8192
@@ -275,6 +279,33 @@ soup_message_setup_body_istream (GInputStream *body_stream,
return istream;
}
+static void
+closed_async (GObject *source,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ GOutputStream *body_ostream = G_OUTPUT_STREAM (source);
+ SoupMessage *msg = user_data;
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+ GCancellable *async_close_wait;
+
+ if (!io || !io->async_close_wait || io->body_ostream != body_ostream) {
+ g_object_unref (msg);
+ return;
+ }
+
+ g_output_stream_close_finish (body_ostream, result, &io->async_close_error);
+ g_clear_object (&io->body_ostream);
+
+ async_close_wait = io->async_close_wait;
+ io->async_close_wait = NULL;
+ g_cancellable_cancel (async_close_wait);
+ g_object_unref (async_close_wait);
+
+ g_object_unref (msg);
+}
+
/*
* There are two request/response formats: the basic request/response,
* possibly with one or more unsolicited informational responses (such
@@ -316,6 +347,17 @@ io_write (SoupMessage *msg, gboolean blocking,
SoupBuffer *chunk;
gssize nwrote;
+ if (io->async_close_error) {
+ g_propagate_error (error, io->async_close_error);
+ io->async_close_error = NULL;
+ return FALSE;
+ } else if (io->async_close_wait) {
+ g_set_error_literal (error, G_IO_ERROR,
+ G_IO_ERROR_WOULD_BLOCK,
+ _("Operation would block"));
+ return FALSE;
+ }
+
switch (io->write_state) {
case SOUP_MESSAGE_IO_STATE_HEADERS:
if (!io->write_buf->len) {
@@ -460,13 +502,27 @@ io_write (SoupMessage *msg, gboolean blocking,
case SOUP_MESSAGE_IO_STATE_BODY_DONE:
if (io->body_ostream) {
- if (!g_output_stream_close (io->body_ostream, cancellable, error))
- return FALSE;
- g_clear_object (&io->body_ostream);
+ if (blocking) {
+ if (!g_output_stream_close (io->body_ostream, cancellable, error))
+ return FALSE;
+ g_clear_object (&io->body_ostream);
+ } else {
+ io->async_close_wait = g_cancellable_new ();
+ if (io->async_context)
+ g_main_context_push_thread_default (io->async_context);
+ g_output_stream_close_async (io->body_ostream,
+ G_PRIORITY_DEFAULT, cancellable,
+ closed_async, g_object_ref (msg));
+ if (io->async_context)
+ g_main_context_pop_thread_default (io->async_context);
+ }
}
io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
soup_message_wrote_body (msg);
+
+ if (io->async_close_wait)
+ return TRUE;
break;
@@ -782,6 +838,8 @@ soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
base_source = g_timeout_source_new (0);
} else if (io->paused) {
base_source = NULL;
+ } else if (io->async_close_wait) {
+ base_source = g_cancellable_source_new (io->async_close_wait);
} else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
GPollableInputStream *istream;
@@ -857,7 +915,7 @@ io_run_until (SoupMessage *msg, gboolean blocking,
g_object_ref (msg);
- while (progress && priv->io_data == io && !io->paused &&
+ while (progress && priv->io_data == io && !io->paused && !io->async_close_wait &&
(io->read_state < read_state || io->write_state < write_state)) {
if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
@@ -881,7 +939,8 @@ io_run_until (SoupMessage *msg, gboolean blocking,
g_propagate_error (error, my_error);
g_object_unref (msg);
return FALSE;
- } else if (g_cancellable_set_error_if_cancelled (cancellable, error)) {
+ } else if (!io->async_close_wait &&
+ g_cancellable_set_error_if_cancelled (cancellable, error)) {
g_object_unref (msg);
return FALSE;
} else if (priv->io_data != io) {
@@ -907,8 +966,6 @@ io_run_until (SoupMessage *msg, gboolean blocking,
return done;
}
-static void io_run (SoupMessage *msg, gboolean blocking);
-
static gboolean
io_run_ready (SoupMessage *msg, gpointer user_data)
{