summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Alexander Steffens (heftig) <jan.steffens@gmail.com>2014-09-17 16:48:02 +0200
committerThibault Saunier <tsaunier@gnome.org>2015-05-11 16:10:10 +0200
commit2d1ac47a57d7e72c125975353c9e9dac691f50b6 (patch)
tree82e2f2a789c524bdf3b92a5b76231d71b442bd0a
parent18da30a929401e5c7478297445b50907b2940b54 (diff)
downloadgstreamer-plugins-bad-2d1ac47a57d7e72c125975353c9e9dac691f50b6.tar.gz
aggregator: Replace GMainContext with GAsyncQueue
The previous implementation kept accumulating GSources, slowing down the iteration and leaking memory. Instead of trying to fix the mcontext flushing, replace it with a GAsyncQueue which is simple to flush and has less overhead. https://bugzilla.gnome.org/show_bug.cgi?id=736782
-rw-r--r--gst-libs/gst/base/gstaggregator.c128
1 files changed, 42 insertions, 86 deletions
diff --git a/gst-libs/gst/base/gstaggregator.c b/gst-libs/gst/base/gstaggregator.c
index e12ed9372..86b5377b0 100644
--- a/gst-libs/gst/base/gstaggregator.c
+++ b/gst-libs/gst/base/gstaggregator.c
@@ -136,34 +136,41 @@ _aggpad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
*************************************/
static GstElementClass *aggregator_parent_class = NULL;
-#define MAIN_CONTEXT_LOCK(self) G_STMT_START { \
- GST_LOG_OBJECT (self, "Getting MAIN_CONTEXT_LOCK in thread %p", \
+#define AGGREGATOR_QUEUE(self) (((GstAggregator*)self)->priv->queue)
+
+#define QUEUE_PUSH(self) G_STMT_START { \
+ GST_LOG_OBJECT (self, "Pushing to QUEUE in thread %p", \
+ g_thread_self()); \
+ g_async_queue_push (AGGREGATOR_QUEUE (self), GINT_TO_POINTER (1)); \
+} G_STMT_END
+
+#define QUEUE_POP(self) G_STMT_START { \
+ GST_LOG_OBJECT (self, "Waiting on QUEUE in thread %p", \
g_thread_self()); \
- g_mutex_lock(&((GstAggregator*)self)->priv->mcontext_lock); \
- GST_LOG_OBJECT (self, "Got MAIN_CONTEXT_LOCK in thread %p", \
+ g_async_queue_pop (AGGREGATOR_QUEUE (self)); \
+ GST_LOG_OBJECT (self, "Waited on QUEUE in thread %p", \
g_thread_self()); \
} G_STMT_END
-#define MAIN_CONTEXT_UNLOCK(self) G_STMT_START { \
- g_mutex_unlock(&((GstAggregator*)self)->priv->mcontext_lock); \
- GST_LOG_OBJECT (self, "Unlocked MAIN_CONTEXT_LOCK in thread %p", \
- g_thread_self()); \
+#define QUEUE_FLUSH(self) G_STMT_START { \
+ GST_LOG_OBJECT (self, "Flushing QUEUE in thread %p", \
+ g_thread_self()); \
+ g_async_queue_lock (AGGREGATOR_QUEUE (self)); \
+ while (g_async_queue_try_pop_unlocked (AGGREGATOR_QUEUE (self))); \
+ g_async_queue_unlock (AGGREGATOR_QUEUE (self)); \
+ GST_LOG_OBJECT (self, "Flushed QUEUE in thread %p", \
+ g_thread_self()); \
} G_STMT_END
struct _GstAggregatorPrivate
{
gint padcount;
- GMainContext *mcontext;
+ GAsyncQueue *queue;
/* Our state is >= PAUSED */
gboolean running;
- /* Ensure that when we remove all sources from the maincontext
- * we can not add any source, avoiding:
- * "g_source_attach: assertion '!SOURCE_DESTROYED (source)' failed" */
- GMutex mcontext_lock;
- GList *gsources;
gint seqnum;
gboolean send_stream_start;
@@ -388,31 +395,20 @@ _push_eos (GstAggregator * self)
gst_pad_push_event (self->srcpad, event);
}
-
-static void
-_destroy_gsource (GSource * source)
-{
- g_source_destroy (source);
- g_source_unref (source);
-}
-
static void
-_remove_all_sources (GstAggregator * self)
-{
- GstAggregatorPrivate *priv = self->priv;
-
- MAIN_CONTEXT_LOCK (self);
- g_list_free_full (priv->gsources, (GDestroyNotify) _destroy_gsource);
- priv->gsources = NULL;
- MAIN_CONTEXT_UNLOCK (self);
-}
-
-static gboolean
aggregate_func (GstAggregator * self)
{
GstAggregatorPrivate *priv = self->priv;
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
+ if (self->priv->running == FALSE) {
+ GST_DEBUG_OBJECT (self, "Not running anymore");
+
+ return;
+ }
+
+ QUEUE_POP (self);
+
GST_LOG_OBJECT (self, "Checking aggregate");
while (priv->send_eos && gst_aggregator_iterate_sinkpads (self,
(GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
@@ -422,8 +418,7 @@ aggregate_func (GstAggregator * self)
priv->flow_return = klass->aggregate (self);
if (priv->flow_return == GST_FLOW_EOS) {
- g_main_context_wakeup (self->priv->mcontext);
- _remove_all_sources (self);
+ QUEUE_FLUSH (self);
_push_eos (self);
}
@@ -438,19 +433,6 @@ aggregate_func (GstAggregator * self)
break;
}
- return G_SOURCE_REMOVE;
-}
-
-static void
-iterate_main_context_func (GstAggregator * self)
-{
- if (self->priv->running == FALSE) {
- GST_DEBUG_OBJECT (self, "Not running anymore");
-
- return;
- }
-
- g_main_context_iteration (self->priv->mcontext, TRUE);
}
static gboolean
@@ -481,15 +463,14 @@ _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
flush_start ? "Pausing" : "Stopping");
self->priv->running = FALSE;
+ QUEUE_PUSH (self);
- /* Clean the stack of GSource set on the MainContext */
- g_main_context_wakeup (self->priv->mcontext);
- _remove_all_sources (self);
if (flush_start) {
res = gst_pad_push_event (self->srcpad, flush_start);
}
gst_pad_stop_task (self->srcpad);
+ QUEUE_FLUSH (self);
return res;
}
@@ -501,21 +482,7 @@ _start_srcpad_task (GstAggregator * self)
self->priv->running = TRUE;
gst_pad_start_task (GST_PAD (self->srcpad),
- (GstTaskFunction) iterate_main_context_func, self, NULL);
-}
-
-static inline void
-_add_aggregate_gsource (GstAggregator * self)
-{
- GSource *source;
- GstAggregatorPrivate *priv = self->priv;
-
- MAIN_CONTEXT_LOCK (self);
- source = g_idle_source_new ();
- g_source_set_callback (source, (GSourceFunc) aggregate_func, self, NULL);
- priv->gsources = g_list_prepend (priv->gsources, source);
- g_source_attach (source, priv->mcontext);
- MAIN_CONTEXT_UNLOCK (self);
+ (GstTaskFunction) aggregate_func, self, NULL);
}
static GstFlowReturn
@@ -617,7 +584,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
gst_pad_push_event (self->srcpad, event);
priv->send_eos = TRUE;
event = NULL;
- _add_aggregate_gsource (self);
+ QUEUE_PUSH (self);
GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
GST_PAD_STREAM_UNLOCK (self->srcpad);
@@ -645,7 +612,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
}
PAD_UNLOCK_EVENT (aggpad);
- _add_aggregate_gsource (self);
+ QUEUE_PUSH (self);
goto eat;
}
case GST_EVENT_SEGMENT:
@@ -769,7 +736,7 @@ _release_pad (GstElement * element, GstPad * pad)
gst_element_remove_pad (element, pad);
/* Something changed make sure we try to aggregate */
- _add_aggregate_gsource (self);
+ QUEUE_PUSH (self);
}
static GstPad *
@@ -1051,24 +1018,16 @@ _sink_query (GstAggregator * self, GstAggregatorPad * aggpad, GstQuery * query)
}
static void
-gst_aggregator_finalize (GObject * object)
-{
- GstAggregator *self = (GstAggregator *) object;
-
- g_mutex_clear (&self->priv->mcontext_lock);
-
- G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
-}
-
-static void
gst_aggregator_dispose (GObject * object)
{
GstAggregator *self = (GstAggregator *) object;
G_OBJECT_CLASS (aggregator_parent_class)->dispose (object);
- g_main_context_unref (self->priv->mcontext);
- _remove_all_sources (self);
+ if (AGGREGATOR_QUEUE (self)) {
+ g_async_queue_unref (AGGREGATOR_QUEUE (self));
+ AGGREGATOR_QUEUE (self) = NULL;
+ }
}
/* GObject vmethods implementations */
@@ -1098,7 +1057,6 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
gstelement_class->release_pad = GST_DEBUG_FUNCPTR (_release_pad);
gstelement_class->change_state = GST_DEBUG_FUNCPTR (_change_state);
- gobject_class->finalize = gst_aggregator_finalize;
gobject_class->dispose = gst_aggregator_dispose;
}
@@ -1124,7 +1082,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
priv->tags_changed = FALSE;
_reset_flow_values (self);
- priv->mcontext = g_main_context_new ();
+ AGGREGATOR_QUEUE (self) = g_async_queue_new ();
self->srcpad = gst_pad_new_from_template (pad_template, "src");
gst_pad_set_event_function (self->srcpad,
@@ -1135,8 +1093,6 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
GST_DEBUG_FUNCPTR ((GstPadActivateModeFunction) src_activate_mode));
gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
-
- g_mutex_init (&self->priv->mcontext_lock);
}
/* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
@@ -1205,7 +1161,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
aggpad->buffer = actual_buf;
PAD_UNLOCK_EVENT (aggpad);
- _add_aggregate_gsource (self);
+ QUEUE_PUSH (self);
GST_DEBUG_OBJECT (aggpad, "Done chaining");