summaryrefslogtreecommitdiff
path: root/agent/inputstream.c
diff options
context:
space:
mode:
authorPhilip Withnall <philip.withnall@collabora.co.uk>2013-12-16 14:42:13 +0000
committerOlivier Crête <olivier.crete@collabora.com>2014-01-31 01:48:59 -0500
commit0ac9f3f90483acc9cc590f2cefea2e823011971d (patch)
treec4e6b901b050e4cc37425c52b1e1494759c80fe1 /agent/inputstream.c
parent6d3a32a0573db6e3fe4a1ad92c3f0678083c5cc3 (diff)
downloadlibnice-0ac9f3f90483acc9cc590f2cefea2e823011971d.tar.gz
agent: Add GPollableInputStream support to NiceInputStream
Diffstat (limited to 'agent/inputstream.c')
-rw-r--r--agent/inputstream.c156
1 files changed, 153 insertions, 3 deletions
diff --git a/agent/inputstream.c b/agent/inputstream.c
index de65f66..0e9f29b 100644
--- a/agent/inputstream.c
+++ b/agent/inputstream.c
@@ -60,13 +60,20 @@
# include "config.h"
#endif
+#include <errno.h>
+
#include "inputstream.h"
#include "agent-priv.h"
static void streams_removed_cb (NiceAgent *agent, guint *stream_ids,
gpointer user_data);
+static void nice_input_stream_init_pollable (
+ GPollableInputStreamInterface *iface);
-G_DEFINE_TYPE (NiceInputStream, nice_input_stream, G_TYPE_INPUT_STREAM);
+G_DEFINE_TYPE_WITH_CODE (NiceInputStream,
+ nice_input_stream, G_TYPE_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+ nice_input_stream_init_pollable));
enum
{
@@ -89,6 +96,11 @@ static void nice_input_stream_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec);
static gssize nice_input_stream_read (GInputStream *stream, void *buffer,
gsize count, GCancellable *cancellable, GError **error);
+static gboolean nice_input_stream_is_readable (GPollableInputStream *stream);
+static gssize nice_input_stream_read_nonblocking (GPollableInputStream *stream,
+ void *buffer, gsize count, GError **error);
+static GSource *nice_input_stream_create_source (GPollableInputStream *stream,
+ GCancellable *cancellable);
static void
nice_input_stream_class_init (NiceInputStreamClass *klass)
@@ -239,6 +251,14 @@ nice_input_stream_init (NiceInputStream *stream)
g_weak_ref_init (&stream->priv->agent_ref, NULL);
}
+static void
+nice_input_stream_init_pollable (GPollableInputStreamInterface *iface)
+{
+ iface->is_readable = nice_input_stream_is_readable;
+ iface->read_nonblocking = nice_input_stream_read_nonblocking;
+ iface->create_source = nice_input_stream_create_source;
+}
+
/**
* nice_input_stream_new:
* @agent: A #NiceAgent
@@ -279,8 +299,11 @@ nice_input_stream_read (GInputStream *stream, void *buffer, gsize count,
gssize len;
/* Closed streams are not readable. */
- if (g_input_stream_is_closed (stream))
- return 0;
+ if (g_input_stream_is_closed (stream)) {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+ "Stream is closed.");
+ return -1;
+ }
/* Has the agent disappeared? */
agent = g_weak_ref_get (&priv->agent_ref);
@@ -298,6 +321,133 @@ nice_input_stream_read (GInputStream *stream, void *buffer, gsize count,
return len;
}
+static gboolean
+nice_input_stream_is_readable (GPollableInputStream *stream)
+{
+ NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
+ Component *component = NULL;
+ Stream *_stream = NULL;
+ gboolean retval = FALSE;
+ GSList *i;
+ NiceAgent *agent; /* owned */
+
+ /* Closed streams are not readable. */
+ if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
+ return FALSE;
+
+ /* Has the agent disappeared? */
+ agent = g_weak_ref_get (&priv->agent_ref);
+ if (agent == NULL)
+ return FALSE;
+
+ agent_lock ();
+
+ if (!agent_find_component (agent, priv->stream_id, priv->component_id,
+ &_stream, &component)) {
+ g_warning ("Could not find component %u in stream %u", priv->component_id,
+ priv->stream_id);
+ goto done;
+ }
+
+ /* If it’s a reliable agent, see if there’s any pending data in the pseudo-TCP
+ * buffer. */
+ if (agent->reliable && component->tcp != NULL &&
+ pseudo_tcp_socket_get_available_bytes (component->tcp) > 0) {
+ retval = TRUE;
+ goto done;
+ }
+
+ /* Check whether any of the component’s FDs are pollable. */
+ for (i = component->socket_sources; i != NULL; i = i->next) {
+ SocketSource *socket_source = i->data;
+ NiceSocket *socket = socket_source->socket;
+
+ if (g_socket_condition_check (socket->fileno, G_IO_IN) != 0) {
+ retval = TRUE;
+ break;
+ }
+ }
+
+done:
+ agent_unlock ();
+
+ g_object_unref (agent);
+
+ return retval;
+}
+
+static gssize
+nice_input_stream_read_nonblocking (GPollableInputStream *stream, void *buffer,
+ gsize count, GError **error)
+{
+ NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
+ NiceAgent *agent; /* owned */
+ gssize len;
+
+ /* Closed streams are not readable. */
+ if (g_input_stream_is_closed (G_INPUT_STREAM (stream))) {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+ "Stream is closed.");
+ return -1;
+ }
+
+ /* Has the agent disappeared? */
+ agent = g_weak_ref_get (&priv->agent_ref);
+ if (agent == NULL) {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+ "Stream is closed due to the NiceAgent being finalised.");
+ return -1;
+ }
+
+ len = nice_agent_recv_nonblocking (agent, priv->stream_id,
+ priv->component_id, (guint8 *) buffer, count, NULL, error);
+
+ g_object_unref (agent);
+
+ return len;
+}
+
+static GSource *
+nice_input_stream_create_source (GPollableInputStream *stream,
+ GCancellable *cancellable)
+{
+ NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
+ GSource *component_source = NULL;
+ Component *component = NULL;
+ Stream *_stream = NULL;
+ NiceAgent *agent; /* owned */
+
+ /* Closed streams cannot have sources. */
+ if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
+ return g_pollable_source_new (G_OBJECT (stream)); /* dummy */
+
+ /* Has the agent disappeared? */
+ agent = g_weak_ref_get (&priv->agent_ref);
+ if (agent == NULL)
+ return g_pollable_source_new (G_OBJECT (stream)); /* dummy */
+
+ agent_lock ();
+
+ /* Grab the socket for this component. */
+ if (!agent_find_component (agent, priv->stream_id, priv->component_id,
+ &_stream, &component)) {
+ g_warning ("Could not find component %u in stream %u", priv->component_id,
+ priv->stream_id);
+ component_source = g_pollable_source_new (G_OBJECT (stream)); /* dummy */
+ goto done;
+ }
+
+ component_source = component_source_new (component, G_OBJECT (stream),
+ G_IO_IN, cancellable);
+
+done:
+ agent_unlock ();
+
+ g_object_unref (agent);
+
+ return component_source;
+}
+
static void
streams_removed_cb (NiceAgent *agent, guint *stream_ids, gpointer user_data)
{