summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOlivier CrĂȘte <olivier.crete@collabora.com>2014-02-24 23:02:39 -0500
committerOlivier CrĂȘte <olivier.crete@collabora.com>2014-02-25 00:51:31 -0500
commitee18973272b06e43eb146b72f41e6cc9a201b410 (patch)
tree6ca308379e92327623b44c5d1865bf3bd333e2c9
parent0cca15608aea922eb90c557f5cb38fd2ad416b69 (diff)
downloadlibnice-ee18973272b06e43eb146b72f41e6cc9a201b410.tar.gz
outputstream: Add a cancellable to get out of blocking write if the stream is removed
-rw-r--r--agent/outputstream.c25
1 files changed, 21 insertions, 4 deletions
diff --git a/agent/outputstream.c b/agent/outputstream.c
index 42e9aab..74574b4 100644
--- a/agent/outputstream.c
+++ b/agent/outputstream.c
@@ -92,6 +92,8 @@ struct _NiceOutputStreamPrivate
GWeakRef/*<NiceAgent>*/ agent_ref;
guint stream_id;
guint component_id;
+
+ GCancellable *closed_cancellable;
};
static void nice_output_stream_dispose (GObject *object);
@@ -191,6 +193,8 @@ nice_output_stream_dispose (GObject *object)
g_weak_ref_clear (&self->priv->agent_ref);
+ g_clear_object (&self->priv->closed_cancellable);
+
G_OBJECT_CLASS (nice_output_stream_parent_class)->dispose (object);
}
@@ -258,6 +262,7 @@ nice_output_stream_init (NiceOutputStream *stream)
NiceOutputStreamPrivate);
g_weak_ref_init (&stream->priv->agent_ref, NULL);
+ stream->priv->closed_cancellable = g_cancellable_new ();
}
static void
@@ -351,7 +356,7 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
gssize len = 0;
gint n_sent;
NiceAgent *agent = NULL; /* owned */
- gulong cancel_id = 0, writeable_id;
+ gulong cancel_id = 0, closed_cancel_id, writeable_id;
WriteData *write_data;
/* Closed streams are not writeable. */
@@ -379,7 +384,7 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
* GCond solution; would be much better for nice_agent_send() to block
* properly in the main loop. */
write_data = g_slice_new0 (WriteData);
- g_atomic_int_set (&write_data->ref_count, 3);
+ g_atomic_int_set (&write_data->ref_count, 4);
g_mutex_init (&write_data->mutex);
g_cond_init (&write_data->cond);
@@ -390,6 +395,10 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
(GDestroyNotify) write_data_unref);
}
+ closed_cancel_id = g_cancellable_connect (self->priv->closed_cancellable,
+ (GCallback) write_cancelled_cb, write_data,
+ (GDestroyNotify) write_data_unref);
+
g_mutex_lock (&write_data->mutex);
writeable_id = g_signal_connect_data (G_OBJECT (agent),
@@ -403,7 +412,8 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
* it will take the agent lock which will cause a deadlock if one of
* the callbacks is called.
*/
- if (g_cancellable_is_cancelled (cancellable))
+ if (g_cancellable_is_cancelled (cancellable) ||
+ g_cancellable_is_cancelled (self->priv->closed_cancellable))
break;
write_data->writable = FALSE;
@@ -428,10 +438,15 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
if (cancel_id)
g_cancellable_disconnect (cancellable, cancel_id);
+ g_cancellable_disconnect (self->priv->closed_cancellable, closed_cancel_id);
if (len == 0) {
- g_cancellable_set_error_if_cancelled (cancellable, error);
len = -1;
+ if (!g_cancellable_set_error_if_cancelled (cancellable, error)) {
+ if (g_cancellable_is_cancelled (self->priv->closed_cancellable))
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+ "Stream has been removed from agent");
+ }
}
write_data_unref (write_data);
@@ -608,6 +623,8 @@ streams_removed_cb (NiceAgent *agent, guint *stream_ids, gpointer user_data)
for (i = 0; stream_ids[i] != 0; i++) {
if (stream_ids[i] == self->priv->stream_id) {
/* The socket has been closed. */
+ g_cancellable_cancel (self->priv->closed_cancellable);
+
g_output_stream_close (G_OUTPUT_STREAM (self), NULL, NULL);
break;
}