From 2d1ac47a57d7e72c125975353c9e9dac691f50b6 Mon Sep 17 00:00:00 2001 From: "Jan Alexander Steffens (heftig)" Date: Wed, 17 Sep 2014 16:48:02 +0200 Subject: aggregator: Replace GMainContext with GAsyncQueue The previous implementation kept accumulating GSources, slowing down the iteration and leaking memory. Instead of trying to fix the mcontext flushing, replace it with a GAsyncQueue which is simple to flush and has less overhead. https://bugzilla.gnome.org/show_bug.cgi?id=736782 --- gst-libs/gst/base/gstaggregator.c | 128 +++++++++++++------------------------- 1 file changed, 42 insertions(+), 86 deletions(-) diff --git a/gst-libs/gst/base/gstaggregator.c b/gst-libs/gst/base/gstaggregator.c index e12ed9372..86b5377b0 100644 --- a/gst-libs/gst/base/gstaggregator.c +++ b/gst-libs/gst/base/gstaggregator.c @@ -136,34 +136,41 @@ _aggpad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) *************************************/ static GstElementClass *aggregator_parent_class = NULL; -#define MAIN_CONTEXT_LOCK(self) G_STMT_START { \ - GST_LOG_OBJECT (self, "Getting MAIN_CONTEXT_LOCK in thread %p", \ +#define AGGREGATOR_QUEUE(self) (((GstAggregator*)self)->priv->queue) + +#define QUEUE_PUSH(self) G_STMT_START { \ + GST_LOG_OBJECT (self, "Pushing to QUEUE in thread %p", \ + g_thread_self()); \ + g_async_queue_push (AGGREGATOR_QUEUE (self), GINT_TO_POINTER (1)); \ +} G_STMT_END + +#define QUEUE_POP(self) G_STMT_START { \ + GST_LOG_OBJECT (self, "Waiting on QUEUE in thread %p", \ g_thread_self()); \ - g_mutex_lock(&((GstAggregator*)self)->priv->mcontext_lock); \ - GST_LOG_OBJECT (self, "Got MAIN_CONTEXT_LOCK in thread %p", \ + g_async_queue_pop (AGGREGATOR_QUEUE (self)); \ + GST_LOG_OBJECT (self, "Waited on QUEUE in thread %p", \ g_thread_self()); \ } G_STMT_END -#define MAIN_CONTEXT_UNLOCK(self) G_STMT_START { \ - g_mutex_unlock(&((GstAggregator*)self)->priv->mcontext_lock); \ - GST_LOG_OBJECT (self, "Unlocked MAIN_CONTEXT_LOCK in thread %p", \ - g_thread_self()); \ +#define QUEUE_FLUSH(self) G_STMT_START { \ + GST_LOG_OBJECT (self, "Flushing QUEUE in thread %p", \ + g_thread_self()); \ + g_async_queue_lock (AGGREGATOR_QUEUE (self)); \ + while (g_async_queue_try_pop_unlocked (AGGREGATOR_QUEUE (self))); \ + g_async_queue_unlock (AGGREGATOR_QUEUE (self)); \ + GST_LOG_OBJECT (self, "Flushed QUEUE in thread %p", \ + g_thread_self()); \ } G_STMT_END struct _GstAggregatorPrivate { gint padcount; - GMainContext *mcontext; + GAsyncQueue *queue; /* Our state is >= PAUSED */ gboolean running; - /* Ensure that when we remove all sources from the maincontext - * we can not add any source, avoiding: - * "g_source_attach: assertion '!SOURCE_DESTROYED (source)' failed" */ - GMutex mcontext_lock; - GList *gsources; gint seqnum; gboolean send_stream_start; @@ -388,31 +395,20 @@ _push_eos (GstAggregator * self) gst_pad_push_event (self->srcpad, event); } - -static void -_destroy_gsource (GSource * source) -{ - g_source_destroy (source); - g_source_unref (source); -} - static void -_remove_all_sources (GstAggregator * self) -{ - GstAggregatorPrivate *priv = self->priv; - - MAIN_CONTEXT_LOCK (self); - g_list_free_full (priv->gsources, (GDestroyNotify) _destroy_gsource); - priv->gsources = NULL; - MAIN_CONTEXT_UNLOCK (self); -} - -static gboolean aggregate_func (GstAggregator * self) { GstAggregatorPrivate *priv = self->priv; GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); + if (self->priv->running == FALSE) { + GST_DEBUG_OBJECT (self, "Not running anymore"); + + return; + } + + QUEUE_POP (self); + GST_LOG_OBJECT (self, "Checking aggregate"); while (priv->send_eos && gst_aggregator_iterate_sinkpads (self, (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, @@ -422,8 +418,7 @@ aggregate_func (GstAggregator * self) priv->flow_return = klass->aggregate (self); if (priv->flow_return == GST_FLOW_EOS) { - g_main_context_wakeup (self->priv->mcontext); - _remove_all_sources (self); + QUEUE_FLUSH (self); _push_eos (self); } @@ -438,19 +433,6 @@ aggregate_func (GstAggregator * self) break; } - return G_SOURCE_REMOVE; -} - -static void -iterate_main_context_func (GstAggregator * self) -{ - if (self->priv->running == FALSE) { - GST_DEBUG_OBJECT (self, "Not running anymore"); - - return; - } - - g_main_context_iteration (self->priv->mcontext, TRUE); } static gboolean @@ -481,15 +463,14 @@ _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start) flush_start ? "Pausing" : "Stopping"); self->priv->running = FALSE; + QUEUE_PUSH (self); - /* Clean the stack of GSource set on the MainContext */ - g_main_context_wakeup (self->priv->mcontext); - _remove_all_sources (self); if (flush_start) { res = gst_pad_push_event (self->srcpad, flush_start); } gst_pad_stop_task (self->srcpad); + QUEUE_FLUSH (self); return res; } @@ -501,21 +482,7 @@ _start_srcpad_task (GstAggregator * self) self->priv->running = TRUE; gst_pad_start_task (GST_PAD (self->srcpad), - (GstTaskFunction) iterate_main_context_func, self, NULL); -} - -static inline void -_add_aggregate_gsource (GstAggregator * self) -{ - GSource *source; - GstAggregatorPrivate *priv = self->priv; - - MAIN_CONTEXT_LOCK (self); - source = g_idle_source_new (); - g_source_set_callback (source, (GSourceFunc) aggregate_func, self, NULL); - priv->gsources = g_list_prepend (priv->gsources, source); - g_source_attach (source, priv->mcontext); - MAIN_CONTEXT_UNLOCK (self); + (GstTaskFunction) aggregate_func, self, NULL); } static GstFlowReturn @@ -617,7 +584,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) gst_pad_push_event (self->srcpad, event); priv->send_eos = TRUE; event = NULL; - _add_aggregate_gsource (self); + QUEUE_PUSH (self); GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK"); GST_PAD_STREAM_UNLOCK (self->srcpad); @@ -645,7 +612,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) } PAD_UNLOCK_EVENT (aggpad); - _add_aggregate_gsource (self); + QUEUE_PUSH (self); goto eat; } case GST_EVENT_SEGMENT: @@ -769,7 +736,7 @@ _release_pad (GstElement * element, GstPad * pad) gst_element_remove_pad (element, pad); /* Something changed make sure we try to aggregate */ - _add_aggregate_gsource (self); + QUEUE_PUSH (self); } static GstPad * @@ -1050,16 +1017,6 @@ _sink_query (GstAggregator * self, GstAggregatorPad * aggpad, GstQuery * query) return gst_pad_query_default (pad, GST_OBJECT (self), query); } -static void -gst_aggregator_finalize (GObject * object) -{ - GstAggregator *self = (GstAggregator *) object; - - g_mutex_clear (&self->priv->mcontext_lock); - - G_OBJECT_CLASS (aggregator_parent_class)->finalize (object); -} - static void gst_aggregator_dispose (GObject * object) { @@ -1067,8 +1024,10 @@ gst_aggregator_dispose (GObject * object) G_OBJECT_CLASS (aggregator_parent_class)->dispose (object); - g_main_context_unref (self->priv->mcontext); - _remove_all_sources (self); + if (AGGREGATOR_QUEUE (self)) { + g_async_queue_unref (AGGREGATOR_QUEUE (self)); + AGGREGATOR_QUEUE (self) = NULL; + } } /* GObject vmethods implementations */ @@ -1098,7 +1057,6 @@ gst_aggregator_class_init (GstAggregatorClass * klass) gstelement_class->release_pad = GST_DEBUG_FUNCPTR (_release_pad); gstelement_class->change_state = GST_DEBUG_FUNCPTR (_change_state); - gobject_class->finalize = gst_aggregator_finalize; gobject_class->dispose = gst_aggregator_dispose; } @@ -1124,7 +1082,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) priv->tags_changed = FALSE; _reset_flow_values (self); - priv->mcontext = g_main_context_new (); + AGGREGATOR_QUEUE (self) = g_async_queue_new (); self->srcpad = gst_pad_new_from_template (pad_template, "src"); gst_pad_set_event_function (self->srcpad, @@ -1135,8 +1093,6 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) GST_DEBUG_FUNCPTR ((GstPadActivateModeFunction) src_activate_mode)); gst_element_add_pad (GST_ELEMENT (self), self->srcpad); - - g_mutex_init (&self->priv->mcontext_lock); } /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init @@ -1205,7 +1161,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) aggpad->buffer = actual_buf; PAD_UNLOCK_EVENT (aggpad); - _add_aggregate_gsource (self); + QUEUE_PUSH (self); GST_DEBUG_OBJECT (aggpad, "Done chaining"); -- cgit v1.2.1