diff options
author | Philip Withnall <philip.withnall@collabora.co.uk> | 2013-12-16 14:42:13 +0000 |
---|---|---|
committer | Olivier Crête <olivier.crete@collabora.com> | 2014-01-31 01:48:59 -0500 |
commit | 0ac9f3f90483acc9cc590f2cefea2e823011971d (patch) | |
tree | c4e6b901b050e4cc37425c52b1e1494759c80fe1 /agent/inputstream.c | |
parent | 6d3a32a0573db6e3fe4a1ad92c3f0678083c5cc3 (diff) | |
download | libnice-0ac9f3f90483acc9cc590f2cefea2e823011971d.tar.gz |
agent: Add GPollableInputStream support to NiceInputStream
Diffstat (limited to 'agent/inputstream.c')
-rw-r--r-- | agent/inputstream.c | 156 |
1 files changed, 153 insertions, 3 deletions
diff --git a/agent/inputstream.c b/agent/inputstream.c index de65f66..0e9f29b 100644 --- a/agent/inputstream.c +++ b/agent/inputstream.c @@ -60,13 +60,20 @@ # include "config.h" #endif +#include <errno.h> + #include "inputstream.h" #include "agent-priv.h" static void streams_removed_cb (NiceAgent *agent, guint *stream_ids, gpointer user_data); +static void nice_input_stream_init_pollable ( + GPollableInputStreamInterface *iface); -G_DEFINE_TYPE (NiceInputStream, nice_input_stream, G_TYPE_INPUT_STREAM); +G_DEFINE_TYPE_WITH_CODE (NiceInputStream, + nice_input_stream, G_TYPE_INPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, + nice_input_stream_init_pollable)); enum { @@ -89,6 +96,11 @@ static void nice_input_stream_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); static gssize nice_input_stream_read (GInputStream *stream, void *buffer, gsize count, GCancellable *cancellable, GError **error); +static gboolean nice_input_stream_is_readable (GPollableInputStream *stream); +static gssize nice_input_stream_read_nonblocking (GPollableInputStream *stream, + void *buffer, gsize count, GError **error); +static GSource *nice_input_stream_create_source (GPollableInputStream *stream, + GCancellable *cancellable); static void nice_input_stream_class_init (NiceInputStreamClass *klass) @@ -239,6 +251,14 @@ nice_input_stream_init (NiceInputStream *stream) g_weak_ref_init (&stream->priv->agent_ref, NULL); } +static void +nice_input_stream_init_pollable (GPollableInputStreamInterface *iface) +{ + iface->is_readable = nice_input_stream_is_readable; + iface->read_nonblocking = nice_input_stream_read_nonblocking; + iface->create_source = nice_input_stream_create_source; +} + /** * nice_input_stream_new: * @agent: A #NiceAgent @@ -279,8 +299,11 @@ nice_input_stream_read (GInputStream *stream, void *buffer, gsize count, gssize len; /* Closed streams are not readable. */ - if (g_input_stream_is_closed (stream)) - return 0; + if (g_input_stream_is_closed (stream)) { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, + "Stream is closed."); + return -1; + } /* Has the agent disappeared? */ agent = g_weak_ref_get (&priv->agent_ref); @@ -298,6 +321,133 @@ nice_input_stream_read (GInputStream *stream, void *buffer, gsize count, return len; } +static gboolean +nice_input_stream_is_readable (GPollableInputStream *stream) +{ + NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv; + Component *component = NULL; + Stream *_stream = NULL; + gboolean retval = FALSE; + GSList *i; + NiceAgent *agent; /* owned */ + + /* Closed streams are not readable. */ + if (g_input_stream_is_closed (G_INPUT_STREAM (stream))) + return FALSE; + + /* Has the agent disappeared? */ + agent = g_weak_ref_get (&priv->agent_ref); + if (agent == NULL) + return FALSE; + + agent_lock (); + + if (!agent_find_component (agent, priv->stream_id, priv->component_id, + &_stream, &component)) { + g_warning ("Could not find component %u in stream %u", priv->component_id, + priv->stream_id); + goto done; + } + + /* If it’s a reliable agent, see if there’s any pending data in the pseudo-TCP + * buffer. */ + if (agent->reliable && component->tcp != NULL && + pseudo_tcp_socket_get_available_bytes (component->tcp) > 0) { + retval = TRUE; + goto done; + } + + /* Check whether any of the component’s FDs are pollable. */ + for (i = component->socket_sources; i != NULL; i = i->next) { + SocketSource *socket_source = i->data; + NiceSocket *socket = socket_source->socket; + + if (g_socket_condition_check (socket->fileno, G_IO_IN) != 0) { + retval = TRUE; + break; + } + } + +done: + agent_unlock (); + + g_object_unref (agent); + + return retval; +} + +static gssize +nice_input_stream_read_nonblocking (GPollableInputStream *stream, void *buffer, + gsize count, GError **error) +{ + NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv; + NiceAgent *agent; /* owned */ + gssize len; + + /* Closed streams are not readable. */ + if (g_input_stream_is_closed (G_INPUT_STREAM (stream))) { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, + "Stream is closed."); + return -1; + } + + /* Has the agent disappeared? */ + agent = g_weak_ref_get (&priv->agent_ref); + if (agent == NULL) { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, + "Stream is closed due to the NiceAgent being finalised."); + return -1; + } + + len = nice_agent_recv_nonblocking (agent, priv->stream_id, + priv->component_id, (guint8 *) buffer, count, NULL, error); + + g_object_unref (agent); + + return len; +} + +static GSource * +nice_input_stream_create_source (GPollableInputStream *stream, + GCancellable *cancellable) +{ + NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv; + GSource *component_source = NULL; + Component *component = NULL; + Stream *_stream = NULL; + NiceAgent *agent; /* owned */ + + /* Closed streams cannot have sources. */ + if (g_input_stream_is_closed (G_INPUT_STREAM (stream))) + return g_pollable_source_new (G_OBJECT (stream)); /* dummy */ + + /* Has the agent disappeared? */ + agent = g_weak_ref_get (&priv->agent_ref); + if (agent == NULL) + return g_pollable_source_new (G_OBJECT (stream)); /* dummy */ + + agent_lock (); + + /* Grab the socket for this component. */ + if (!agent_find_component (agent, priv->stream_id, priv->component_id, + &_stream, &component)) { + g_warning ("Could not find component %u in stream %u", priv->component_id, + priv->stream_id); + component_source = g_pollable_source_new (G_OBJECT (stream)); /* dummy */ + goto done; + } + + component_source = component_source_new (component, G_OBJECT (stream), + G_IO_IN, cancellable); + +done: + agent_unlock (); + + g_object_unref (agent); + + return component_source; +} + static void streams_removed_cb (NiceAgent *agent, guint *stream_ids, gpointer user_data) { |