summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gst-libs/gst/app/gstappsink.c106
-rw-r--r--gst-libs/gst/app/gstappsrc.c133
2 files changed, 160 insertions, 79 deletions
diff --git a/gst-libs/gst/app/gstappsink.c b/gst-libs/gst/app/gstappsink.c
index cac9821fa..38e56a9d2 100644
--- a/gst-libs/gst/app/gstappsink.c
+++ b/gst-libs/gst/app/gstappsink.c
@@ -80,6 +80,34 @@ typedef enum
APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */
} GstAppSinkWaitStatus;
+typedef struct
+{
+ GstAppSinkCallbacks callbacks;
+ gpointer user_data;
+ GDestroyNotify destroy_notify;
+ gint ref_count;
+} Callbacks;
+
+static Callbacks *
+callbacks_ref (Callbacks * callbacks)
+{
+ g_atomic_int_inc (&callbacks->ref_count);
+
+ return callbacks;
+}
+
+static void
+callbacks_unref (Callbacks * callbacks)
+{
+ if (!g_atomic_int_dec_and_test (&callbacks->ref_count))
+ return;
+
+ if (callbacks->destroy_notify)
+ callbacks->destroy_notify (callbacks->user_data);
+
+ g_free (callbacks);
+}
+
struct _GstAppSinkPrivate
{
GstCaps *caps;
@@ -104,9 +132,7 @@ struct _GstAppSinkPrivate
gboolean is_eos;
gboolean buffer_lists_supported;
- GstAppSinkCallbacks callbacks;
- gpointer user_data;
- GDestroyNotify notify;
+ Callbacks *callbacks;
GstSample *sample;
};
@@ -476,21 +502,18 @@ gst_app_sink_dispose (GObject * obj)
GstAppSink *appsink = GST_APP_SINK_CAST (obj);
GstAppSinkPrivate *priv = appsink->priv;
GstMiniObject *queue_obj;
+ Callbacks *callbacks = NULL;
GST_OBJECT_LOCK (appsink);
if (priv->caps) {
gst_caps_unref (priv->caps);
priv->caps = NULL;
}
- if (priv->notify) {
- priv->notify (priv->user_data);
- }
- priv->user_data = NULL;
- priv->notify = NULL;
-
GST_OBJECT_UNLOCK (appsink);
g_mutex_lock (&priv->mutex);
+ if (priv->callbacks)
+ callbacks = g_steal_pointer (&priv->callbacks);
while ((queue_obj = gst_queue_array_pop_head (priv->queue)))
gst_mini_object_unref (queue_obj);
gst_buffer_replace (&priv->preroll_buffer, NULL);
@@ -502,6 +525,8 @@ gst_app_sink_dispose (GObject * obj)
}
g_mutex_unlock (&priv->mutex);
+ g_clear_pointer (&callbacks, callbacks_unref);
+
G_OBJECT_CLASS (parent_class)->dispose (obj);
}
@@ -715,6 +740,7 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event)
break;
case GST_EVENT_EOS:{
gboolean emit = TRUE;
+ Callbacks *callbacks = NULL;
g_mutex_lock (&priv->mutex);
GST_DEBUG_OBJECT (appsink, "receiving EOS");
@@ -748,14 +774,19 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event)
}
if (priv->flushing)
emit = FALSE;
+
+ if (emit && priv->callbacks)
+ callbacks = callbacks_ref (priv->callbacks);
g_mutex_unlock (&priv->mutex);
if (emit) {
/* emit EOS now */
- if (priv->callbacks.eos)
- priv->callbacks.eos (appsink, priv->user_data);
+ if (callbacks && callbacks->callbacks.eos)
+ callbacks->callbacks.eos (appsink, callbacks->user_data);
else
g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_EOS], 0);
+
+ g_clear_pointer (&callbacks, callbacks_unref);
}
break;
@@ -784,6 +815,7 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer)
GstAppSink *appsink = GST_APP_SINK_CAST (psink);
GstAppSinkPrivate *priv = appsink->priv;
gboolean emit;
+ Callbacks *callbacks = NULL;
g_mutex_lock (&priv->mutex);
if (priv->flushing)
@@ -796,10 +828,12 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer)
g_cond_signal (&priv->cond);
emit = priv->emit_signals;
+ if (priv->callbacks)
+ callbacks = callbacks_ref (priv->callbacks);
g_mutex_unlock (&priv->mutex);
- if (priv->callbacks.new_preroll) {
- res = priv->callbacks.new_preroll (appsink, priv->user_data);
+ if (callbacks && callbacks->callbacks.new_preroll) {
+ res = callbacks->callbacks.new_preroll (appsink, callbacks->user_data);
} else {
res = GST_FLOW_OK;
if (emit)
@@ -807,6 +841,8 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer)
&res);
}
+ g_clear_pointer (&callbacks, callbacks_unref);
+
return res;
flushing:
@@ -870,6 +906,7 @@ gst_app_sink_render_common (GstBaseSink * psink, GstMiniObject * data,
GstAppSink *appsink = GST_APP_SINK_CAST (psink);
GstAppSinkPrivate *priv = appsink->priv;
gboolean emit;
+ Callbacks *callbacks = NULL;
restart:
g_mutex_lock (&priv->mutex);
@@ -929,15 +966,19 @@ restart:
g_cond_signal (&priv->cond);
emit = priv->emit_signals;
+ if (priv->callbacks)
+ callbacks = callbacks_ref (priv->callbacks);
g_mutex_unlock (&priv->mutex);
- if (priv->callbacks.new_sample) {
- ret = priv->callbacks.new_sample (appsink, priv->user_data);
+ if (callbacks && callbacks->callbacks.new_sample) {
+ ret = callbacks->callbacks.new_sample (appsink, callbacks->user_data);
} else {
ret = GST_FLOW_OK;
if (emit)
g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_NEW_SAMPLE], 0, &ret);
}
+ g_clear_pointer (&callbacks, callbacks_unref);
+
return ret;
flushing:
@@ -1721,12 +1762,15 @@ not_started:
*
* If callbacks are installed, no signals will be emitted for performance
* reasons.
+ *
+ * Before 1.16.3 it was not possible to change the callbacks in a thread-safe
+ * way.
*/
void
gst_app_sink_set_callbacks (GstAppSink * appsink,
GstAppSinkCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
{
- GDestroyNotify old_notify;
+ Callbacks *old_callbacks, *new_callbacks = NULL;
GstAppSinkPrivate *priv;
g_return_if_fail (GST_IS_APP_SINK (appsink));
@@ -1734,26 +1778,20 @@ gst_app_sink_set_callbacks (GstAppSink * appsink,
priv = appsink->priv;
- GST_OBJECT_LOCK (appsink);
- old_notify = priv->notify;
-
- if (old_notify) {
- gpointer old_data;
-
- old_data = priv->user_data;
-
- priv->user_data = NULL;
- priv->notify = NULL;
- GST_OBJECT_UNLOCK (appsink);
+ if (callbacks) {
+ new_callbacks = g_new0 (Callbacks, 1);
+ new_callbacks->callbacks = *callbacks;
+ new_callbacks->user_data = user_data;
+ new_callbacks->destroy_notify = notify;
+ new_callbacks->ref_count = 1;
+ }
- old_notify (old_data);
+ g_mutex_lock (&priv->mutex);
+ old_callbacks = g_steal_pointer (&priv->callbacks);
+ priv->callbacks = g_steal_pointer (&new_callbacks);
+ g_mutex_unlock (&priv->mutex);
- GST_OBJECT_LOCK (appsink);
- }
- priv->callbacks = *callbacks;
- priv->user_data = user_data;
- priv->notify = notify;
- GST_OBJECT_UNLOCK (appsink);
+ g_clear_pointer (&old_callbacks, callbacks_unref);
}
/*** GSTURIHANDLER INTERFACE *************************************************/
diff --git a/gst-libs/gst/app/gstappsrc.c b/gst-libs/gst/app/gstappsrc.c
index 66ad05e1e..c937c8d42 100644
--- a/gst-libs/gst/app/gstappsrc.c
+++ b/gst-libs/gst/app/gstappsrc.c
@@ -108,6 +108,35 @@ typedef enum
APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */
} GstAppSrcWaitStatus;
+typedef struct
+{
+ GstAppSrcCallbacks callbacks;
+ gpointer user_data;
+ GDestroyNotify destroy_notify;
+ gint ref_count;
+} Callbacks;
+
+static Callbacks *
+callbacks_ref (Callbacks * callbacks)
+{
+ g_atomic_int_inc (&callbacks->ref_count);
+
+ return callbacks;
+}
+
+static void
+callbacks_unref (Callbacks * callbacks)
+{
+ if (!g_atomic_int_dec_and_test (&callbacks->ref_count))
+ return;
+
+ if (callbacks->destroy_notify)
+ callbacks->destroy_notify (callbacks->user_data);
+
+ g_free (callbacks);
+}
+
+
struct _GstAppSrcPrivate
{
GCond cond;
@@ -138,9 +167,7 @@ struct _GstAppSrcPrivate
gboolean emit_signals;
guint min_percent;
- GstAppSrcCallbacks callbacks;
- gpointer user_data;
- GDestroyNotify notify;
+ Callbacks *callbacks;
};
GST_DEBUG_CATEGORY_STATIC (app_src_debug);
@@ -621,6 +648,7 @@ gst_app_src_dispose (GObject * obj)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
GstAppSrcPrivate *priv = appsrc->priv;
+ Callbacks *callbacks = NULL;
GST_OBJECT_LOCK (appsrc);
if (priv->current_caps) {
@@ -631,18 +659,16 @@ gst_app_src_dispose (GObject * obj)
gst_caps_unref (priv->last_caps);
priv->last_caps = NULL;
}
- if (priv->notify) {
- priv->notify (priv->user_data);
- }
- priv->user_data = NULL;
- priv->notify = NULL;
-
GST_OBJECT_UNLOCK (appsrc);
g_mutex_lock (&priv->mutex);
+ if (priv->callbacks)
+ callbacks = g_steal_pointer (&priv->callbacks);
gst_app_src_flush_queued (appsrc, FALSE);
g_mutex_unlock (&priv->mutex);
+ g_clear_pointer (&callbacks, callbacks_unref);
+
G_OBJECT_CLASS (parent_class)->dispose (obj);
}
@@ -997,6 +1023,8 @@ gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
GstAppSrcPrivate *priv = appsrc->priv;
gint64 desired_position;
gboolean res = FALSE;
+ gboolean emit;
+ Callbacks *callbacks = NULL;
desired_position = segment->position;
@@ -1007,20 +1035,23 @@ gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
desired_position, gst_format_get_name (segment->format));
- if (priv->callbacks.seek_data)
- res = priv->callbacks.seek_data (appsrc, desired_position, priv->user_data);
- else {
- gboolean emit;
-
- g_mutex_lock (&priv->mutex);
- emit = priv->emit_signals;
- g_mutex_unlock (&priv->mutex);
+ g_mutex_lock (&priv->mutex);
+ emit = priv->emit_signals;
+ if (priv->callbacks)
+ callbacks = callbacks_ref (priv->callbacks);
+ g_mutex_unlock (&priv->mutex);
- if (emit)
- g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
- desired_position, &res);
+ if (callbacks && callbacks->callbacks.seek_data) {
+ res =
+ callbacks->callbacks.seek_data (appsrc, desired_position,
+ callbacks->user_data);
+ } else if (emit) {
+ g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
+ desired_position, &res);
}
+ g_clear_pointer (&callbacks, callbacks_unref);
+
if (res) {
GST_DEBUG_OBJECT (appsrc, "flushing queue");
g_mutex_lock (&priv->mutex);
@@ -1041,20 +1072,25 @@ gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
gboolean res = FALSE;
gboolean emit;
GstAppSrcPrivate *priv = appsrc->priv;
+ Callbacks *callbacks = NULL;
emit = priv->emit_signals;
+ if (priv->callbacks)
+ callbacks = callbacks_ref (priv->callbacks);
g_mutex_unlock (&priv->mutex);
GST_DEBUG_OBJECT (appsrc,
"we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
priv->offset, offset);
- if (priv->callbacks.seek_data)
- res = priv->callbacks.seek_data (appsrc, offset, priv->user_data);
+ if (callbacks && callbacks->callbacks.seek_data)
+ res = callbacks->callbacks.seek_data (appsrc, offset, callbacks->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
offset, &res);
+ g_clear_pointer (&callbacks, callbacks_unref);
+
g_mutex_lock (&priv->mutex);
return res;
@@ -1067,17 +1103,22 @@ gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
{
gboolean emit;
GstAppSrcPrivate *priv = appsrc->priv;
+ Callbacks *callbacks = NULL;
emit = priv->emit_signals;
+ if (priv->callbacks)
+ callbacks = callbacks_ref (priv->callbacks);
g_mutex_unlock (&priv->mutex);
/* we have no data, we need some. We fire the signal with the size hint. */
- if (priv->callbacks.need_data)
- priv->callbacks.need_data (appsrc, size, priv->user_data);
+ if (callbacks && callbacks->callbacks.need_data)
+ callbacks->callbacks.need_data (appsrc, size, callbacks->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
NULL);
+ g_clear_pointer (&callbacks, callbacks_unref);
+
g_mutex_lock (&priv->mutex);
/* we can be flushing now because we released the lock */
}
@@ -1817,18 +1858,23 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
priv->queued_bytes, priv->max_bytes);
if (first) {
+ Callbacks *callbacks = NULL;
gboolean emit;
emit = priv->emit_signals;
+ if (priv->callbacks)
+ callbacks = callbacks_ref (priv->callbacks);
/* only signal on the first push */
g_mutex_unlock (&priv->mutex);
- if (priv->callbacks.enough_data)
- priv->callbacks.enough_data (appsrc, priv->user_data);
+ if (callbacks && callbacks->callbacks.enough_data)
+ callbacks->callbacks.enough_data (appsrc, callbacks->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
NULL);
+ g_clear_pointer (&callbacks, callbacks_unref);
+
g_mutex_lock (&priv->mutex);
/* continue to check for flushing/eos after releasing the lock */
first = FALSE;
@@ -2081,12 +2127,15 @@ flushing:
*
* If callbacks are installed, no signals will be emitted for performance
* reasons.
+ *
+ * Before 1.16.3 it was not possible to change the callbacks in a thread-safe
+ * way.
*/
void
gst_app_src_set_callbacks (GstAppSrc * appsrc,
GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
{
- GDestroyNotify old_notify;
+ Callbacks *old_callbacks, *new_callbacks = NULL;
GstAppSrcPrivate *priv;
g_return_if_fail (GST_IS_APP_SRC (appsrc));
@@ -2094,26 +2143,20 @@ gst_app_src_set_callbacks (GstAppSrc * appsrc,
priv = appsrc->priv;
- GST_OBJECT_LOCK (appsrc);
- old_notify = priv->notify;
-
- if (old_notify) {
- gpointer old_data;
-
- old_data = priv->user_data;
-
- priv->user_data = NULL;
- priv->notify = NULL;
- GST_OBJECT_UNLOCK (appsrc);
+ if (callbacks) {
+ new_callbacks = g_new0 (Callbacks, 1);
+ new_callbacks->callbacks = *callbacks;
+ new_callbacks->user_data = user_data;
+ new_callbacks->destroy_notify = notify;
+ new_callbacks->ref_count = 1;
+ }
- old_notify (old_data);
+ g_mutex_lock (&priv->mutex);
+ old_callbacks = g_steal_pointer (&priv->callbacks);
+ priv->callbacks = g_steal_pointer (&new_callbacks);
+ g_mutex_unlock (&priv->mutex);
- GST_OBJECT_LOCK (appsrc);
- }
- priv->callbacks = *callbacks;
- priv->user_data = user_data;
- priv->notify = notify;
- GST_OBJECT_UNLOCK (appsrc);
+ g_clear_pointer (&old_callbacks, callbacks_unref);
}
/*** GSTURIHANDLER INTERFACE *************************************************/