summaryrefslogtreecommitdiff
path: root/gst-libs/gst/app/gstappsrc.c
diff options
context:
space:
mode:
Diffstat (limited to 'gst-libs/gst/app/gstappsrc.c')
-rw-r--r--gst-libs/gst/app/gstappsrc.c133
1 files changed, 88 insertions, 45 deletions
diff --git a/gst-libs/gst/app/gstappsrc.c b/gst-libs/gst/app/gstappsrc.c
index 66ad05e1e..c937c8d42 100644
--- a/gst-libs/gst/app/gstappsrc.c
+++ b/gst-libs/gst/app/gstappsrc.c
@@ -108,6 +108,35 @@ typedef enum
APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */
} GstAppSrcWaitStatus;
+typedef struct
+{
+ GstAppSrcCallbacks callbacks;
+ gpointer user_data;
+ GDestroyNotify destroy_notify;
+ gint ref_count;
+} Callbacks;
+
+static Callbacks *
+callbacks_ref (Callbacks * callbacks)
+{
+ g_atomic_int_inc (&callbacks->ref_count);
+
+ return callbacks;
+}
+
+static void
+callbacks_unref (Callbacks * callbacks)
+{
+ if (!g_atomic_int_dec_and_test (&callbacks->ref_count))
+ return;
+
+ if (callbacks->destroy_notify)
+ callbacks->destroy_notify (callbacks->user_data);
+
+ g_free (callbacks);
+}
+
+
struct _GstAppSrcPrivate
{
GCond cond;
@@ -138,9 +167,7 @@ struct _GstAppSrcPrivate
gboolean emit_signals;
guint min_percent;
- GstAppSrcCallbacks callbacks;
- gpointer user_data;
- GDestroyNotify notify;
+ Callbacks *callbacks;
};
GST_DEBUG_CATEGORY_STATIC (app_src_debug);
@@ -621,6 +648,7 @@ gst_app_src_dispose (GObject * obj)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
GstAppSrcPrivate *priv = appsrc->priv;
+ Callbacks *callbacks = NULL;
GST_OBJECT_LOCK (appsrc);
if (priv->current_caps) {
@@ -631,18 +659,16 @@ gst_app_src_dispose (GObject * obj)
gst_caps_unref (priv->last_caps);
priv->last_caps = NULL;
}
- if (priv->notify) {
- priv->notify (priv->user_data);
- }
- priv->user_data = NULL;
- priv->notify = NULL;
-
GST_OBJECT_UNLOCK (appsrc);
g_mutex_lock (&priv->mutex);
+ if (priv->callbacks)
+ callbacks = g_steal_pointer (&priv->callbacks);
gst_app_src_flush_queued (appsrc, FALSE);
g_mutex_unlock (&priv->mutex);
+ g_clear_pointer (&callbacks, callbacks_unref);
+
G_OBJECT_CLASS (parent_class)->dispose (obj);
}
@@ -997,6 +1023,8 @@ gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
GstAppSrcPrivate *priv = appsrc->priv;
gint64 desired_position;
gboolean res = FALSE;
+ gboolean emit;
+ Callbacks *callbacks = NULL;
desired_position = segment->position;
@@ -1007,20 +1035,23 @@ gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
desired_position, gst_format_get_name (segment->format));
- if (priv->callbacks.seek_data)
- res = priv->callbacks.seek_data (appsrc, desired_position, priv->user_data);
- else {
- gboolean emit;
-
- g_mutex_lock (&priv->mutex);
- emit = priv->emit_signals;
- g_mutex_unlock (&priv->mutex);
+ g_mutex_lock (&priv->mutex);
+ emit = priv->emit_signals;
+ if (priv->callbacks)
+ callbacks = callbacks_ref (priv->callbacks);
+ g_mutex_unlock (&priv->mutex);
- if (emit)
- g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
- desired_position, &res);
+ if (callbacks && callbacks->callbacks.seek_data) {
+ res =
+ callbacks->callbacks.seek_data (appsrc, desired_position,
+ callbacks->user_data);
+ } else if (emit) {
+ g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
+ desired_position, &res);
}
+ g_clear_pointer (&callbacks, callbacks_unref);
+
if (res) {
GST_DEBUG_OBJECT (appsrc, "flushing queue");
g_mutex_lock (&priv->mutex);
@@ -1041,20 +1072,25 @@ gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
gboolean res = FALSE;
gboolean emit;
GstAppSrcPrivate *priv = appsrc->priv;
+ Callbacks *callbacks = NULL;
emit = priv->emit_signals;
+ if (priv->callbacks)
+ callbacks = callbacks_ref (priv->callbacks);
g_mutex_unlock (&priv->mutex);
GST_DEBUG_OBJECT (appsrc,
"we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
priv->offset, offset);
- if (priv->callbacks.seek_data)
- res = priv->callbacks.seek_data (appsrc, offset, priv->user_data);
+ if (callbacks && callbacks->callbacks.seek_data)
+ res = callbacks->callbacks.seek_data (appsrc, offset, callbacks->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
offset, &res);
+ g_clear_pointer (&callbacks, callbacks_unref);
+
g_mutex_lock (&priv->mutex);
return res;
@@ -1067,17 +1103,22 @@ gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
{
gboolean emit;
GstAppSrcPrivate *priv = appsrc->priv;
+ Callbacks *callbacks = NULL;
emit = priv->emit_signals;
+ if (priv->callbacks)
+ callbacks = callbacks_ref (priv->callbacks);
g_mutex_unlock (&priv->mutex);
/* we have no data, we need some. We fire the signal with the size hint. */
- if (priv->callbacks.need_data)
- priv->callbacks.need_data (appsrc, size, priv->user_data);
+ if (callbacks && callbacks->callbacks.need_data)
+ callbacks->callbacks.need_data (appsrc, size, callbacks->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
NULL);
+ g_clear_pointer (&callbacks, callbacks_unref);
+
g_mutex_lock (&priv->mutex);
/* we can be flushing now because we released the lock */
}
@@ -1817,18 +1858,23 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
priv->queued_bytes, priv->max_bytes);
if (first) {
+ Callbacks *callbacks = NULL;
gboolean emit;
emit = priv->emit_signals;
+ if (priv->callbacks)
+ callbacks = callbacks_ref (priv->callbacks);
/* only signal on the first push */
g_mutex_unlock (&priv->mutex);
- if (priv->callbacks.enough_data)
- priv->callbacks.enough_data (appsrc, priv->user_data);
+ if (callbacks && callbacks->callbacks.enough_data)
+ callbacks->callbacks.enough_data (appsrc, callbacks->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
NULL);
+ g_clear_pointer (&callbacks, callbacks_unref);
+
g_mutex_lock (&priv->mutex);
/* continue to check for flushing/eos after releasing the lock */
first = FALSE;
@@ -2081,12 +2127,15 @@ flushing:
*
* If callbacks are installed, no signals will be emitted for performance
* reasons.
+ *
+ * Before 1.16.3 it was not possible to change the callbacks in a thread-safe
+ * way.
*/
void
gst_app_src_set_callbacks (GstAppSrc * appsrc,
GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
{
- GDestroyNotify old_notify;
+ Callbacks *old_callbacks, *new_callbacks = NULL;
GstAppSrcPrivate *priv;
g_return_if_fail (GST_IS_APP_SRC (appsrc));
@@ -2094,26 +2143,20 @@ gst_app_src_set_callbacks (GstAppSrc * appsrc,
priv = appsrc->priv;
- GST_OBJECT_LOCK (appsrc);
- old_notify = priv->notify;
-
- if (old_notify) {
- gpointer old_data;
-
- old_data = priv->user_data;
-
- priv->user_data = NULL;
- priv->notify = NULL;
- GST_OBJECT_UNLOCK (appsrc);
+ if (callbacks) {
+ new_callbacks = g_new0 (Callbacks, 1);
+ new_callbacks->callbacks = *callbacks;
+ new_callbacks->user_data = user_data;
+ new_callbacks->destroy_notify = notify;
+ new_callbacks->ref_count = 1;
+ }
- old_notify (old_data);
+ g_mutex_lock (&priv->mutex);
+ old_callbacks = g_steal_pointer (&priv->callbacks);
+ priv->callbacks = g_steal_pointer (&new_callbacks);
+ g_mutex_unlock (&priv->mutex);
- GST_OBJECT_LOCK (appsrc);
- }
- priv->callbacks = *callbacks;
- priv->user_data = user_data;
- priv->notify = notify;
- GST_OBJECT_UNLOCK (appsrc);
+ g_clear_pointer (&old_callbacks, callbacks_unref);
}
/*** GSTURIHANDLER INTERFACE *************************************************/