summaryrefslogtreecommitdiff
path: root/gst
diff options
context:
space:
mode:
authorYouness Alaoui <kakaroto@kakaroto.homelinux.net>2014-08-08 15:32:13 -0400
committerOlivier CrĂȘte <olivier.crete@collabora.com>2014-08-13 19:31:01 -0400
commit38351e4a86bc18191198564de7731d6f51e7b787 (patch)
treee78e5272457ee55293cece82ec4d7376e379530c /gst
parentb370a95e10c8d14776304227c592f80abfe5eac4 (diff)
downloadlibnice-38351e4a86bc18191198564de7731d6f51e7b787.tar.gz
nicesink: Block until component is writable if in reliable mode
Diffstat (limited to 'gst')
-rw-r--r--gst/gstnicesink.c113
-rw-r--r--gst/gstnicesink.h4
2 files changed, 107 insertions, 10 deletions
diff --git a/gst/gstnicesink.c b/gst/gstnicesink.c
index 6580c8e..5433a30 100644
--- a/gst/gstnicesink.c
+++ b/gst/gstnicesink.c
@@ -49,6 +49,19 @@ gst_nice_sink_render (
GstBaseSink *basesink,
GstBuffer *buffer);
+static gboolean
+gst_nice_sink_unlock (GstBaseSink *basesink);
+
+static gboolean
+gst_nice_sink_unlock_stop (GstBaseSink *basesink);
+
+static void
+_reliable_transport_writable (
+ NiceAgent *agent,
+ guint stream_id,
+ guint component_id,
+ GstNiceSink *sink);
+
static void
gst_nice_sink_set_property (
GObject *object,
@@ -99,6 +112,8 @@ gst_nice_sink_class_init (GstNiceSinkClass *klass)
gstbasesink_class = (GstBaseSinkClass *) klass;
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_nice_sink_render);
+ gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_nice_sink_unlock);
+ gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_nice_sink_unlock_stop);
gobject_class = (GObjectClass *) klass;
gobject_class->set_property = gst_nice_sink_set_property;
@@ -153,29 +168,89 @@ gst_nice_sink_class_init (GstNiceSinkClass *klass)
static void
gst_nice_sink_init (GstNiceSink *sink)
{
+ g_cond_init (&sink->writable_cond);
+}
+
+static void
+_reliable_transport_writable (NiceAgent *agent, guint stream_id,
+ guint component_id, GstNiceSink *sink)
+{
+ GST_OBJECT_LOCK (sink);
+ if (stream_id == sink->stream_id && component_id == sink->component_id) {
+ g_cond_broadcast (&sink->writable_cond);
+ }
+ GST_OBJECT_UNLOCK (sink);
}
static GstFlowReturn
gst_nice_sink_render (GstBaseSink *basesink, GstBuffer *buffer)
{
GstNiceSink *nicesink = GST_NICE_SINK (basesink);
+ guint written = 0;
+ gint ret;
+ gchar *data = NULL;
+ guint size = 0;
+ GstFlowReturn flow_ret = GST_FLOW_OK;
#if GST_CHECK_VERSION (1,0,0)
GstMapInfo info;
gst_buffer_map (buffer, &info, GST_MAP_READ);
+ data = (gchar *) info.data;
+ size = info.size;
+#else
+ data = (gchar *) GST_BUFFER_DATA (buffer);
+ size = GST_BUFFER_SIZE (buffer);
+#endif
- nice_agent_send (nicesink->agent, nicesink->stream_id,
- nicesink->component_id, info.size, (gchar *) info.data);
+ GST_OBJECT_LOCK (nicesink);
+ do {
+ ret = nice_agent_send (nicesink->agent, nicesink->stream_id,
+ nicesink->component_id, size - written, data + written);
+ if (ret > 0)
+ written += ret;
- gst_buffer_unmap (buffer, &info);
+ if (nicesink->reliable && written < size)
+ g_cond_wait (&nicesink->writable_cond, GST_OBJECT_GET_LOCK (nicesink));
+ if (nicesink->flushing) {
+#if GST_CHECK_VERSION (1,0,0)
+ flow_ret = GST_FLOW_FLUSHING;
#else
- nice_agent_send (nicesink->agent, nicesink->stream_id,
- nicesink->component_id, GST_BUFFER_SIZE (buffer),
- (gchar *) GST_BUFFER_DATA (buffer));
+ flow_ret = GST_FLOW_WRONG_STATE;
#endif
+ break;
+ }
+ } while (nicesink->reliable && written < size);
+ GST_OBJECT_UNLOCK (nicesink);
- return GST_FLOW_OK;
+#if GST_CHECK_VERSION (1,0,0)
+ gst_buffer_unmap (buffer, &info);
+#endif
+
+ return flow_ret;
+}
+
+static gboolean gst_nice_sink_unlock (GstBaseSink *basesink)
+{
+ GstNiceSink *nicesink = GST_NICE_SINK (basesink);
+
+ GST_OBJECT_LOCK (nicesink);
+ nicesink->flushing = TRUE;
+ g_cond_broadcast (&nicesink->writable_cond);
+ GST_OBJECT_UNLOCK (nicesink);
+
+ return TRUE;
+}
+
+static gboolean gst_nice_sink_unlock_stop (GstBaseSink *basesink)
+{
+ GstNiceSink *nicesink = GST_NICE_SINK (basesink);
+
+ GST_OBJECT_LOCK (nicesink);
+ nicesink->flushing = FALSE;
+ GST_OBJECT_UNLOCK (nicesink);
+
+ return TRUE;
}
@@ -184,10 +259,14 @@ gst_nice_sink_dispose (GObject *object)
{
GstNiceSink *sink = GST_NICE_SINK (object);
- if (sink->agent)
+ if (sink->agent) {
+ g_signal_handler_disconnect (sink->agent, sink->writable_id);
g_object_unref (sink->agent);
+ }
sink->agent = NULL;
+ g_cond_clear (&sink->writable_cond);
+
G_OBJECT_CLASS (gst_nice_sink_parent_class)->dispose (object);
}
@@ -203,19 +282,29 @@ gst_nice_sink_set_property (
switch (prop_id)
{
case PROP_AGENT:
- if (sink->agent)
+ if (sink->agent) {
GST_ERROR_OBJECT (object,
"Changing the agent on a nice sink not allowed");
- else
+ } else {
sink->agent = g_value_dup_object (value);
+ g_object_get (sink->agent, "reliable", &sink->reliable, NULL);
+ if (sink->reliable)
+ sink->writable_id = g_signal_connect (sink->agent,
+ "reliable-transport-writable",
+ (GCallback) _reliable_transport_writable, sink);
+ }
break;
case PROP_STREAM:
+ GST_OBJECT_LOCK (sink);
sink->stream_id = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (sink);
break;
case PROP_COMPONENT:
+ GST_OBJECT_LOCK (sink);
sink->component_id = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (sink);
break;
default:
@@ -240,11 +329,15 @@ gst_nice_sink_get_property (
break;
case PROP_STREAM:
+ GST_OBJECT_LOCK (sink);
g_value_set_uint (value, sink->stream_id);
+ GST_OBJECT_UNLOCK (sink);
break;
case PROP_COMPONENT:
+ GST_OBJECT_LOCK (sink);
g_value_set_uint (value, sink->component_id);
+ GST_OBJECT_UNLOCK (sink);
break;
default:
diff --git a/gst/gstnicesink.h b/gst/gstnicesink.h
index 8ec0641..9529f64 100644
--- a/gst/gstnicesink.h
+++ b/gst/gstnicesink.h
@@ -65,6 +65,10 @@ struct _GstNiceSink
NiceAgent *agent;
guint stream_id;
guint component_id;
+ gboolean reliable;
+ GCond writable_cond;
+ gulong writable_id;
+ gboolean flushing;
};
typedef struct _GstNiceSinkClass GstNiceSinkClass;