diff options
-rw-r--r-- | gst-libs/gst/app/gstappsink.c | 106 | ||||
-rw-r--r-- | gst-libs/gst/app/gstappsrc.c | 133 |
2 files changed, 160 insertions, 79 deletions
diff --git a/gst-libs/gst/app/gstappsink.c b/gst-libs/gst/app/gstappsink.c index cac9821fa..38e56a9d2 100644 --- a/gst-libs/gst/app/gstappsink.c +++ b/gst-libs/gst/app/gstappsink.c @@ -80,6 +80,34 @@ typedef enum APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */ } GstAppSinkWaitStatus; +typedef struct +{ + GstAppSinkCallbacks callbacks; + gpointer user_data; + GDestroyNotify destroy_notify; + gint ref_count; +} Callbacks; + +static Callbacks * +callbacks_ref (Callbacks * callbacks) +{ + g_atomic_int_inc (&callbacks->ref_count); + + return callbacks; +} + +static void +callbacks_unref (Callbacks * callbacks) +{ + if (!g_atomic_int_dec_and_test (&callbacks->ref_count)) + return; + + if (callbacks->destroy_notify) + callbacks->destroy_notify (callbacks->user_data); + + g_free (callbacks); +} + struct _GstAppSinkPrivate { GstCaps *caps; @@ -104,9 +132,7 @@ struct _GstAppSinkPrivate gboolean is_eos; gboolean buffer_lists_supported; - GstAppSinkCallbacks callbacks; - gpointer user_data; - GDestroyNotify notify; + Callbacks *callbacks; GstSample *sample; }; @@ -476,21 +502,18 @@ gst_app_sink_dispose (GObject * obj) GstAppSink *appsink = GST_APP_SINK_CAST (obj); GstAppSinkPrivate *priv = appsink->priv; GstMiniObject *queue_obj; + Callbacks *callbacks = NULL; GST_OBJECT_LOCK (appsink); if (priv->caps) { gst_caps_unref (priv->caps); priv->caps = NULL; } - if (priv->notify) { - priv->notify (priv->user_data); - } - priv->user_data = NULL; - priv->notify = NULL; - GST_OBJECT_UNLOCK (appsink); g_mutex_lock (&priv->mutex); + if (priv->callbacks) + callbacks = g_steal_pointer (&priv->callbacks); while ((queue_obj = gst_queue_array_pop_head (priv->queue))) gst_mini_object_unref (queue_obj); gst_buffer_replace (&priv->preroll_buffer, NULL); @@ -502,6 +525,8 @@ gst_app_sink_dispose (GObject * obj) } g_mutex_unlock (&priv->mutex); + g_clear_pointer (&callbacks, callbacks_unref); + G_OBJECT_CLASS (parent_class)->dispose (obj); } @@ -715,6 +740,7 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event) break; case GST_EVENT_EOS:{ gboolean emit = TRUE; + Callbacks *callbacks = NULL; g_mutex_lock (&priv->mutex); GST_DEBUG_OBJECT (appsink, "receiving EOS"); @@ -748,14 +774,19 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event) } if (priv->flushing) emit = FALSE; + + if (emit && priv->callbacks) + callbacks = callbacks_ref (priv->callbacks); g_mutex_unlock (&priv->mutex); if (emit) { /* emit EOS now */ - if (priv->callbacks.eos) - priv->callbacks.eos (appsink, priv->user_data); + if (callbacks && callbacks->callbacks.eos) + callbacks->callbacks.eos (appsink, callbacks->user_data); else g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_EOS], 0); + + g_clear_pointer (&callbacks, callbacks_unref); } break; @@ -784,6 +815,7 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer) GstAppSink *appsink = GST_APP_SINK_CAST (psink); GstAppSinkPrivate *priv = appsink->priv; gboolean emit; + Callbacks *callbacks = NULL; g_mutex_lock (&priv->mutex); if (priv->flushing) @@ -796,10 +828,12 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer) g_cond_signal (&priv->cond); emit = priv->emit_signals; + if (priv->callbacks) + callbacks = callbacks_ref (priv->callbacks); g_mutex_unlock (&priv->mutex); - if (priv->callbacks.new_preroll) { - res = priv->callbacks.new_preroll (appsink, priv->user_data); + if (callbacks && callbacks->callbacks.new_preroll) { + res = callbacks->callbacks.new_preroll (appsink, callbacks->user_data); } else { res = GST_FLOW_OK; if (emit) @@ -807,6 +841,8 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer) &res); } + g_clear_pointer (&callbacks, callbacks_unref); + return res; flushing: @@ -870,6 +906,7 @@ gst_app_sink_render_common (GstBaseSink * psink, GstMiniObject * data, GstAppSink *appsink = GST_APP_SINK_CAST (psink); GstAppSinkPrivate *priv = appsink->priv; gboolean emit; + Callbacks *callbacks = NULL; restart: g_mutex_lock (&priv->mutex); @@ -929,15 +966,19 @@ restart: g_cond_signal (&priv->cond); emit = priv->emit_signals; + if (priv->callbacks) + callbacks = callbacks_ref (priv->callbacks); g_mutex_unlock (&priv->mutex); - if (priv->callbacks.new_sample) { - ret = priv->callbacks.new_sample (appsink, priv->user_data); + if (callbacks && callbacks->callbacks.new_sample) { + ret = callbacks->callbacks.new_sample (appsink, callbacks->user_data); } else { ret = GST_FLOW_OK; if (emit) g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_NEW_SAMPLE], 0, &ret); } + g_clear_pointer (&callbacks, callbacks_unref); + return ret; flushing: @@ -1721,12 +1762,15 @@ not_started: * * If callbacks are installed, no signals will be emitted for performance * reasons. + * + * Before 1.16.3 it was not possible to change the callbacks in a thread-safe + * way. */ void gst_app_sink_set_callbacks (GstAppSink * appsink, GstAppSinkCallbacks * callbacks, gpointer user_data, GDestroyNotify notify) { - GDestroyNotify old_notify; + Callbacks *old_callbacks, *new_callbacks = NULL; GstAppSinkPrivate *priv; g_return_if_fail (GST_IS_APP_SINK (appsink)); @@ -1734,26 +1778,20 @@ gst_app_sink_set_callbacks (GstAppSink * appsink, priv = appsink->priv; - GST_OBJECT_LOCK (appsink); - old_notify = priv->notify; - - if (old_notify) { - gpointer old_data; - - old_data = priv->user_data; - - priv->user_data = NULL; - priv->notify = NULL; - GST_OBJECT_UNLOCK (appsink); + if (callbacks) { + new_callbacks = g_new0 (Callbacks, 1); + new_callbacks->callbacks = *callbacks; + new_callbacks->user_data = user_data; + new_callbacks->destroy_notify = notify; + new_callbacks->ref_count = 1; + } - old_notify (old_data); + g_mutex_lock (&priv->mutex); + old_callbacks = g_steal_pointer (&priv->callbacks); + priv->callbacks = g_steal_pointer (&new_callbacks); + g_mutex_unlock (&priv->mutex); - GST_OBJECT_LOCK (appsink); - } - priv->callbacks = *callbacks; - priv->user_data = user_data; - priv->notify = notify; - GST_OBJECT_UNLOCK (appsink); + g_clear_pointer (&old_callbacks, callbacks_unref); } /*** GSTURIHANDLER INTERFACE *************************************************/ diff --git a/gst-libs/gst/app/gstappsrc.c b/gst-libs/gst/app/gstappsrc.c index 66ad05e1e..c937c8d42 100644 --- a/gst-libs/gst/app/gstappsrc.c +++ b/gst-libs/gst/app/gstappsrc.c @@ -108,6 +108,35 @@ typedef enum APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */ } GstAppSrcWaitStatus; +typedef struct +{ + GstAppSrcCallbacks callbacks; + gpointer user_data; + GDestroyNotify destroy_notify; + gint ref_count; +} Callbacks; + +static Callbacks * +callbacks_ref (Callbacks * callbacks) +{ + g_atomic_int_inc (&callbacks->ref_count); + + return callbacks; +} + +static void +callbacks_unref (Callbacks * callbacks) +{ + if (!g_atomic_int_dec_and_test (&callbacks->ref_count)) + return; + + if (callbacks->destroy_notify) + callbacks->destroy_notify (callbacks->user_data); + + g_free (callbacks); +} + + struct _GstAppSrcPrivate { GCond cond; @@ -138,9 +167,7 @@ struct _GstAppSrcPrivate gboolean emit_signals; guint min_percent; - GstAppSrcCallbacks callbacks; - gpointer user_data; - GDestroyNotify notify; + Callbacks *callbacks; }; GST_DEBUG_CATEGORY_STATIC (app_src_debug); @@ -621,6 +648,7 @@ gst_app_src_dispose (GObject * obj) { GstAppSrc *appsrc = GST_APP_SRC_CAST (obj); GstAppSrcPrivate *priv = appsrc->priv; + Callbacks *callbacks = NULL; GST_OBJECT_LOCK (appsrc); if (priv->current_caps) { @@ -631,18 +659,16 @@ gst_app_src_dispose (GObject * obj) gst_caps_unref (priv->last_caps); priv->last_caps = NULL; } - if (priv->notify) { - priv->notify (priv->user_data); - } - priv->user_data = NULL; - priv->notify = NULL; - GST_OBJECT_UNLOCK (appsrc); g_mutex_lock (&priv->mutex); + if (priv->callbacks) + callbacks = g_steal_pointer (&priv->callbacks); gst_app_src_flush_queued (appsrc, FALSE); g_mutex_unlock (&priv->mutex); + g_clear_pointer (&callbacks, callbacks_unref); + G_OBJECT_CLASS (parent_class)->dispose (obj); } @@ -997,6 +1023,8 @@ gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment) GstAppSrcPrivate *priv = appsrc->priv; gint64 desired_position; gboolean res = FALSE; + gboolean emit; + Callbacks *callbacks = NULL; desired_position = segment->position; @@ -1007,20 +1035,23 @@ gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment) GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s", desired_position, gst_format_get_name (segment->format)); - if (priv->callbacks.seek_data) - res = priv->callbacks.seek_data (appsrc, desired_position, priv->user_data); - else { - gboolean emit; - - g_mutex_lock (&priv->mutex); - emit = priv->emit_signals; - g_mutex_unlock (&priv->mutex); + g_mutex_lock (&priv->mutex); + emit = priv->emit_signals; + if (priv->callbacks) + callbacks = callbacks_ref (priv->callbacks); + g_mutex_unlock (&priv->mutex); - if (emit) - g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0, - desired_position, &res); + if (callbacks && callbacks->callbacks.seek_data) { + res = + callbacks->callbacks.seek_data (appsrc, desired_position, + callbacks->user_data); + } else if (emit) { + g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0, + desired_position, &res); } + g_clear_pointer (&callbacks, callbacks_unref); + if (res) { GST_DEBUG_OBJECT (appsrc, "flushing queue"); g_mutex_lock (&priv->mutex); @@ -1041,20 +1072,25 @@ gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset) gboolean res = FALSE; gboolean emit; GstAppSrcPrivate *priv = appsrc->priv; + Callbacks *callbacks = NULL; emit = priv->emit_signals; + if (priv->callbacks) + callbacks = callbacks_ref (priv->callbacks); g_mutex_unlock (&priv->mutex); GST_DEBUG_OBJECT (appsrc, "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT, priv->offset, offset); - if (priv->callbacks.seek_data) - res = priv->callbacks.seek_data (appsrc, offset, priv->user_data); + if (callbacks && callbacks->callbacks.seek_data) + res = callbacks->callbacks.seek_data (appsrc, offset, callbacks->user_data); else if (emit) g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0, offset, &res); + g_clear_pointer (&callbacks, callbacks_unref); + g_mutex_lock (&priv->mutex); return res; @@ -1067,17 +1103,22 @@ gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size) { gboolean emit; GstAppSrcPrivate *priv = appsrc->priv; + Callbacks *callbacks = NULL; emit = priv->emit_signals; + if (priv->callbacks) + callbacks = callbacks_ref (priv->callbacks); g_mutex_unlock (&priv->mutex); /* we have no data, we need some. We fire the signal with the size hint. */ - if (priv->callbacks.need_data) - priv->callbacks.need_data (appsrc, size, priv->user_data); + if (callbacks && callbacks->callbacks.need_data) + callbacks->callbacks.need_data (appsrc, size, callbacks->user_data); else if (emit) g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size, NULL); + g_clear_pointer (&callbacks, callbacks_unref); + g_mutex_lock (&priv->mutex); /* we can be flushing now because we released the lock */ } @@ -1817,18 +1858,23 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer, priv->queued_bytes, priv->max_bytes); if (first) { + Callbacks *callbacks = NULL; gboolean emit; emit = priv->emit_signals; + if (priv->callbacks) + callbacks = callbacks_ref (priv->callbacks); /* only signal on the first push */ g_mutex_unlock (&priv->mutex); - if (priv->callbacks.enough_data) - priv->callbacks.enough_data (appsrc, priv->user_data); + if (callbacks && callbacks->callbacks.enough_data) + callbacks->callbacks.enough_data (appsrc, callbacks->user_data); else if (emit) g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0, NULL); + g_clear_pointer (&callbacks, callbacks_unref); + g_mutex_lock (&priv->mutex); /* continue to check for flushing/eos after releasing the lock */ first = FALSE; @@ -2081,12 +2127,15 @@ flushing: * * If callbacks are installed, no signals will be emitted for performance * reasons. + * + * Before 1.16.3 it was not possible to change the callbacks in a thread-safe + * way. */ void gst_app_src_set_callbacks (GstAppSrc * appsrc, GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify) { - GDestroyNotify old_notify; + Callbacks *old_callbacks, *new_callbacks = NULL; GstAppSrcPrivate *priv; g_return_if_fail (GST_IS_APP_SRC (appsrc)); @@ -2094,26 +2143,20 @@ gst_app_src_set_callbacks (GstAppSrc * appsrc, priv = appsrc->priv; - GST_OBJECT_LOCK (appsrc); - old_notify = priv->notify; - - if (old_notify) { - gpointer old_data; - - old_data = priv->user_data; - - priv->user_data = NULL; - priv->notify = NULL; - GST_OBJECT_UNLOCK (appsrc); + if (callbacks) { + new_callbacks = g_new0 (Callbacks, 1); + new_callbacks->callbacks = *callbacks; + new_callbacks->user_data = user_data; + new_callbacks->destroy_notify = notify; + new_callbacks->ref_count = 1; + } - old_notify (old_data); + g_mutex_lock (&priv->mutex); + old_callbacks = g_steal_pointer (&priv->callbacks); + priv->callbacks = g_steal_pointer (&new_callbacks); + g_mutex_unlock (&priv->mutex); - GST_OBJECT_LOCK (appsrc); - } - priv->callbacks = *callbacks; - priv->user_data = user_data; - priv->notify = notify; - GST_OBJECT_UNLOCK (appsrc); + g_clear_pointer (&old_callbacks, callbacks_unref); } /*** GSTURIHANDLER INTERFACE *************************************************/ |