diff options
author | Tim-Philipp Müller <tim@centricular.com> | 2017-12-02 16:01:25 +0000 |
---|---|---|
committer | Tim-Philipp Müller <tim@centricular.com> | 2017-12-02 16:14:36 +0000 |
commit | d01297e115e475760764d9caa4eb37b5f7f9f31a (patch) | |
tree | ac7a05eadf2f5acb747b6a7b052ec444bac59437 /gst-libs | |
parent | 737067e793ab0bda60b7ec28af9b6fa51ace4581 (diff) | |
download | gstreamer-plugins-bad-d01297e115e475760764d9caa4eb37b5f7f9f31a.tar.gz |
Remove GstAggregator from -bad, moved to core
https://bugzilla.gnome.org/show_bug.cgi?id=739010
Diffstat (limited to 'gst-libs')
-rw-r--r-- | gst-libs/gst/Makefile.am | 7 | ||||
-rw-r--r-- | gst-libs/gst/audio/Makefile.am | 1 | ||||
-rw-r--r-- | gst-libs/gst/audio/meson.build | 4 | ||||
-rw-r--r-- | gst-libs/gst/base/Makefile.am | 16 | ||||
-rw-r--r-- | gst-libs/gst/base/gstaggregator.c | 3022 | ||||
-rw-r--r-- | gst-libs/gst/base/gstaggregator.h | 336 | ||||
-rw-r--r-- | gst-libs/gst/base/meson.build | 17 | ||||
-rw-r--r-- | gst-libs/gst/meson.build | 1 | ||||
-rw-r--r-- | gst-libs/gst/video/Makefile.am | 1 | ||||
-rw-r--r-- | gst-libs/gst/video/meson.build | 4 |
10 files changed, 7 insertions, 3402 deletions
diff --git a/gst-libs/gst/Makefile.am b/gst-libs/gst/Makefile.am index 020e01afc..1939611a2 100644 --- a/gst-libs/gst/Makefile.am +++ b/gst-libs/gst/Makefile.am @@ -15,22 +15,21 @@ OPENCV_DIR=opencv endif SUBDIRS = uridownloader adaptivedemux interfaces basecamerabinsrc codecparsers \ - insertbin mpegts base video audio player allocators isoff $(GL_DIR) $(WAYLAND_DIR) \ + insertbin mpegts video audio player allocators isoff $(GL_DIR) $(WAYLAND_DIR) \ $(OPENCV_DIR) noinst_HEADERS = gst-i18n-plugin.h gettext.h glib-compat-private.h DIST_SUBDIRS = uridownloader adaptivedemux interfaces gl basecamerabinsrc \ - codecparsers insertbin mpegts wayland opencv base video audio player allocators isoff + codecparsers insertbin mpegts wayland opencv video audio player allocators isoff #dependencies -video, audio: base gl: allocators adaptivedemux: uridownloader INDEPENDENT_SUBDIRS = \ interfaces basecamerabinsrc codecparsers insertbin uridownloader \ - mpegts base player allocators isoff $(GL_DIR) $(WAYLAND_DIR) $(OPENCV_DIR) + mpegts player allocators isoff $(GL_DIR) $(WAYLAND_DIR) $(OPENCV_DIR) .PHONY: independent-subdirs $(INDEPENDENT_SUBDIRS) diff --git a/gst-libs/gst/audio/Makefile.am b/gst-libs/gst/audio/Makefile.am index 02154fa1d..ca9e3f7e4 100644 --- a/gst-libs/gst/audio/Makefile.am +++ b/gst-libs/gst/audio/Makefile.am @@ -18,7 +18,6 @@ libgstbadaudio_@GST_API_VERSION@_la_CFLAGS = \ $(GST_BASE_CFLAGS) libgstbadaudio_@GST_API_VERSION@_la_LIBADD = \ - $(top_builddir)/gst-libs/gst/base/libgstbadbase-$(GST_API_VERSION).la \ $(GST_PLUGINS_BASE_LIBS) -lgstaudio-$(GST_API_VERSION) \ $(GST_BASE_LIBS) $(GST_LIBS) $(ORC_LIBS) $(LIBM) diff --git a/gst-libs/gst/audio/meson.build b/gst-libs/gst/audio/meson.build index 3154e3ca3..ac4871903 100644 --- a/gst-libs/gst/audio/meson.build +++ b/gst-libs/gst/audio/meson.build @@ -10,9 +10,9 @@ gstbadaudio = library('gstbadaudio-' + api_version, version : libversion, soversion : soversion, install : true, - dependencies : [gstbadbase_dep, gstaudio_dep], + dependencies : [gstaudio_dep, gstbase_dep], ) gstbadaudio_dep = declare_dependency(link_with : gstbadaudio, include_directories : [libsinc], - dependencies : [gstbadbase_dep, gstaudio_dep]) + dependencies : [gstaudio_dep, gstbase_dep]) diff --git a/gst-libs/gst/base/Makefile.am b/gst-libs/gst/base/Makefile.am deleted file mode 100644 index 993701d77..000000000 --- a/gst-libs/gst/base/Makefile.am +++ /dev/null @@ -1,16 +0,0 @@ -lib_LTLIBRARIES = libgstbadbase-@GST_API_VERSION@.la - -libgstbadbase_@GST_API_VERSION@_la_SOURCES = \ - gstaggregator.c - -libgstbadbase_@GST_API_VERSION@_la_CFLAGS = $(GST_CFLAGS) \ - -DGST_USE_UNSTABLE_API - -libgstbadbase_@GST_API_VERSION@_la_LIBADD = $(GST_LIBS) -libgstbadbase_@GST_API_VERSION@_la_LDFLAGS = $(GST_LIB_LDFLAGS) $(GST_ALL_LDFLAGS) $(GST_LT_LDFLAGS) - -libgstbase_@GST_API_VERSION@includedir = $(includedir)/gstreamer-@GST_API_VERSION@/gst/base -libgstbase_@GST_API_VERSION@include_HEADERS = gstaggregator.h - -EXTRA_DIST = - diff --git a/gst-libs/gst/base/gstaggregator.c b/gst-libs/gst/base/gstaggregator.c deleted file mode 100644 index 9dddf837c..000000000 --- a/gst-libs/gst/base/gstaggregator.c +++ /dev/null @@ -1,3022 +0,0 @@ -/* GStreamer aggregator base class - * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com> - * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org> - * - * gstaggregator.c: - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ -/** - * SECTION: gstaggregator - * @title: GstAggregator - * @short_description: manages a set of pads with the purpose of - * aggregating their buffers. - * @see_also: gstcollectpads for historical reasons. - * - * Manages a set of pads with the purpose of aggregating their buffers. - * Control is given to the subclass when all pads have data. - * - * * Base class for mixers and muxers. Subclasses should at least implement - * the #GstAggregatorClass.aggregate() virtual method. - * - * * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a - * #GstPadQueryFunction to queue all serialized data packets per sink pad. - * Subclasses should not overwrite those, but instead implement - * #GstAggregatorClass.sink_event() and #GstAggregatorClass.sink_query() as - * needed. - * - * * When data is queued on all pads, the aggregate vmethod is called. - * - * * One can peek at the data on any given GstAggregatorPad with the - * gst_aggregator_pad_get_buffer () method, and take ownership of it - * with the gst_aggregator_pad_steal_buffer () method. When a buffer - * has been taken with steal_buffer (), a new buffer can be queued - * on that pad. - * - * * If the subclass wishes to push a buffer downstream in its aggregate - * implementation, it should do so through the - * gst_aggregator_finish_buffer () method. This method will take care - * of sending and ordering mandatory events such as stream start, caps - * and segment. - * - * * Same goes for EOS events, which should not be pushed directly by the - * subclass, it should instead return GST_FLOW_EOS in its aggregate - * implementation. - * - * * Note that the aggregator logic regarding gap event handling is to turn - * these into gap buffers with matching PTS and duration. It will also - * flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE - * to ease their identification and subsequent processing. - * - */ - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include <string.h> /* strlen */ - -#include "gstaggregator.h" - -typedef enum -{ - GST_AGGREGATOR_START_TIME_SELECTION_ZERO, - GST_AGGREGATOR_START_TIME_SELECTION_FIRST, - GST_AGGREGATOR_START_TIME_SELECTION_SET -} GstAggregatorStartTimeSelection; - -static GType -gst_aggregator_start_time_selection_get_type (void) -{ - static GType gtype = 0; - - if (gtype == 0) { - static const GEnumValue values[] = { - {GST_AGGREGATOR_START_TIME_SELECTION_ZERO, - "Start at 0 running time (default)", "zero"}, - {GST_AGGREGATOR_START_TIME_SELECTION_FIRST, - "Start at first observed input running time", "first"}, - {GST_AGGREGATOR_START_TIME_SELECTION_SET, - "Set start time with start-time property", "set"}, - {0, NULL, NULL} - }; - - gtype = g_enum_register_static ("GstAggregatorStartTimeSelection", values); - } - return gtype; -} - -/* Might become API */ -static void gst_aggregator_merge_tags (GstAggregator * aggregator, - const GstTagList * tags, GstTagMergeMode mode); -static void gst_aggregator_set_latency_property (GstAggregator * agg, - GstClockTime latency); -static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg); - -static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); - -GST_DEBUG_CATEGORY_STATIC (aggregator_debug); -#define GST_CAT_DEFAULT aggregator_debug - -/* Locking order, locks in this element must always be taken in this order - * - * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad) - * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad) - * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad) - * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST - * standard element object lock -> GST_OBJECT_LOCK(agg) - * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad) - * standard src pad object lock -> GST_OBJECT_LOCK(srcpad) - * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad) - */ - -/* GstAggregatorPad definitions */ -#define PAD_LOCK(pad) G_STMT_START { \ - GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p", \ - g_thread_self()); \ - g_mutex_lock(&pad->priv->lock); \ - GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p", \ - g_thread_self()); \ - } G_STMT_END - -#define PAD_UNLOCK(pad) G_STMT_START { \ - GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p", \ - g_thread_self()); \ - g_mutex_unlock(&pad->priv->lock); \ - GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p", \ - g_thread_self()); \ - } G_STMT_END - - -#define PAD_WAIT_EVENT(pad) G_STMT_START { \ - GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p", \ - g_thread_self()); \ - g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond), \ - (&((GstAggregatorPad*)pad)->priv->lock)); \ - GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \ - g_thread_self()); \ - } G_STMT_END - -#define PAD_BROADCAST_EVENT(pad) G_STMT_START { \ - GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p", \ - g_thread_self()); \ - g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond)); \ - } G_STMT_END - - -#define PAD_FLUSH_LOCK(pad) G_STMT_START { \ - GST_TRACE_OBJECT (pad, "Taking lock from thread %p", \ - g_thread_self()); \ - g_mutex_lock(&pad->priv->flush_lock); \ - GST_TRACE_OBJECT (pad, "Took lock from thread %p", \ - g_thread_self()); \ - } G_STMT_END - -#define PAD_FLUSH_UNLOCK(pad) G_STMT_START { \ - GST_TRACE_OBJECT (pad, "Releasing lock from thread %p", \ - g_thread_self()); \ - g_mutex_unlock(&pad->priv->flush_lock); \ - GST_TRACE_OBJECT (pad, "Release lock from thread %p", \ - g_thread_self()); \ - } G_STMT_END - -#define SRC_LOCK(self) G_STMT_START { \ - GST_TRACE_OBJECT (self, "Taking src lock from thread %p", \ - g_thread_self()); \ - g_mutex_lock(&self->priv->src_lock); \ - GST_TRACE_OBJECT (self, "Took src lock from thread %p", \ - g_thread_self()); \ - } G_STMT_END - -#define SRC_UNLOCK(self) G_STMT_START { \ - GST_TRACE_OBJECT (self, "Releasing src lock from thread %p", \ - g_thread_self()); \ - g_mutex_unlock(&self->priv->src_lock); \ - GST_TRACE_OBJECT (self, "Released src lock from thread %p", \ - g_thread_self()); \ - } G_STMT_END - -#define SRC_WAIT(self) G_STMT_START { \ - GST_LOG_OBJECT (self, "Waiting for src on thread %p", \ - g_thread_self()); \ - g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock)); \ - GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p", \ - g_thread_self()); \ - } G_STMT_END - -#define SRC_BROADCAST(self) G_STMT_START { \ - GST_LOG_OBJECT (self, "Signaling src from thread %p", \ - g_thread_self()); \ - if (self->priv->aggregate_id) \ - gst_clock_id_unschedule (self->priv->aggregate_id); \ - g_cond_broadcast(&(self->priv->src_cond)); \ - } G_STMT_END - -struct _GstAggregatorPadPrivate -{ - /* Following fields are protected by the PAD_LOCK */ - GstFlowReturn flow_return; - gboolean pending_flush_start; - gboolean pending_flush_stop; - - gboolean first_buffer; - - GQueue data; /* buffers, events and queries */ - GstBuffer *clipped_buffer; - guint num_buffers; - - /* used to track fill state of queues, only used with live-src and when - * latency property is set to > 0 */ - GstClockTime head_position; - GstClockTime tail_position; - GstClockTime head_time; /* running time */ - GstClockTime tail_time; - GstClockTime time_level; /* how much head is ahead of tail */ - GstSegment head_segment; /* segment before the queue */ - - gboolean negotiated; - - gboolean eos; - - GMutex lock; - GCond event_cond; - /* This lock prevents a flush start processing happening while - * the chain function is also happening. - */ - GMutex flush_lock; -}; - -/* Must be called with PAD_LOCK held */ -static void -gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad) -{ - aggpad->priv->eos = FALSE; - aggpad->priv->flow_return = GST_FLOW_OK; - GST_OBJECT_LOCK (aggpad); - gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED); - gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED); - GST_OBJECT_UNLOCK (aggpad); - aggpad->priv->head_position = GST_CLOCK_TIME_NONE; - aggpad->priv->tail_position = GST_CLOCK_TIME_NONE; - aggpad->priv->head_time = GST_CLOCK_TIME_NONE; - aggpad->priv->tail_time = GST_CLOCK_TIME_NONE; - aggpad->priv->time_level = 0; - aggpad->priv->first_buffer = TRUE; -} - -static gboolean -gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) -{ - GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad); - - PAD_LOCK (aggpad); - gst_aggregator_pad_reset_unlocked (aggpad); - PAD_UNLOCK (aggpad); - - if (klass->flush) - return klass->flush (aggpad, agg); - - return TRUE; -} - -/************************************* - * GstAggregator implementation * - *************************************/ -static GstElementClass *aggregator_parent_class = NULL; - -/* All members are protected by the object lock unless otherwise noted */ - -struct _GstAggregatorPrivate -{ - gint max_padserial; - - /* Our state is >= PAUSED */ - gboolean running; /* protected by src_lock */ - - /* seqnum from seek or segment, - * to be applied to synthetic segment/eos events */ - gint seqnum; - gboolean send_stream_start; /* protected by srcpad stream lock */ - gboolean send_segment; - gboolean flush_seeking; - gboolean pending_flush_start; - gboolean send_eos; /* protected by srcpad stream lock */ - - GstCaps *srccaps; /* protected by the srcpad stream lock */ - - GstTagList *tags; - gboolean tags_changed; - - gboolean peer_latency_live; /* protected by src_lock */ - GstClockTime peer_latency_min; /* protected by src_lock */ - GstClockTime peer_latency_max; /* protected by src_lock */ - gboolean has_peer_latency; /* protected by src_lock */ - - GstClockTime sub_latency_min; /* protected by src_lock */ - GstClockTime sub_latency_max; /* protected by src_lock */ - - /* aggregate */ - GstClockID aggregate_id; /* protected by src_lock */ - GMutex src_lock; - GCond src_cond; - - gboolean first_buffer; /* protected by object lock */ - GstAggregatorStartTimeSelection start_time_selection; - GstClockTime start_time; - - /* protected by the object lock */ - GstQuery *allocation_query; - GstAllocator *allocator; - GstBufferPool *pool; - GstAllocationParams allocation_params; - - /* properties */ - gint64 latency; /* protected by both src_lock and all pad locks */ -}; - -/* Seek event forwarding helper */ -typedef struct -{ - /* parameters */ - GstEvent *event; - gboolean flush; - gboolean only_to_active_pads; - - /* results */ - gboolean result; - gboolean one_actually_seeked; -} EventData; - -#define DEFAULT_LATENCY 0 -#define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO -#define DEFAULT_START_TIME (-1) - -enum -{ - PROP_0, - PROP_LATENCY, - PROP_START_TIME_SELECTION, - PROP_START_TIME, - PROP_LAST -}; - -static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, - GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head); - -static gboolean -gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) -{ - return (g_queue_peek_tail (&pad->priv->data) == NULL && - pad->priv->clipped_buffer == NULL); -} - -static gboolean -gst_aggregator_check_pads_ready (GstAggregator * self) -{ - GstAggregatorPad *pad; - GList *l, *sinkpads; - gboolean have_buffer = TRUE; - gboolean have_event_or_query = FALSE; - - GST_LOG_OBJECT (self, "checking pads"); - - GST_OBJECT_LOCK (self); - - sinkpads = GST_ELEMENT_CAST (self)->sinkpads; - if (sinkpads == NULL) - goto no_sinkpads; - - for (l = sinkpads; l != NULL; l = l->next) { - pad = l->data; - - PAD_LOCK (pad); - - if (pad->priv->num_buffers == 0) { - if (!gst_aggregator_pad_queue_is_empty (pad)) - have_event_or_query = TRUE; - if (!pad->priv->eos) { - have_buffer = FALSE; - - /* If not live we need data on all pads, so leave the loop */ - if (!self->priv->peer_latency_live) { - PAD_UNLOCK (pad); - goto pad_not_ready; - } - } - } else if (self->priv->peer_latency_live) { - /* In live mode, having a single pad with buffers is enough to - * generate a start time from it. In non-live mode all pads need - * to have a buffer - */ - self->priv->first_buffer = FALSE; - } - - PAD_UNLOCK (pad); - } - - if (!have_buffer && !have_event_or_query) - goto pad_not_ready; - - if (have_buffer) - self->priv->first_buffer = FALSE; - - GST_OBJECT_UNLOCK (self); - GST_LOG_OBJECT (self, "pads are ready"); - return TRUE; - -no_sinkpads: - { - GST_LOG_OBJECT (self, "pads not ready: no sink pads"); - GST_OBJECT_UNLOCK (self); - return FALSE; - } -pad_not_ready: - { - if (have_event_or_query) - GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet," - " but waking up for serialized event"); - else - GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet"); - GST_OBJECT_UNLOCK (self); - return have_event_or_query; - } -} - -static void -gst_aggregator_reset_flow_values (GstAggregator * self) -{ - GST_OBJECT_LOCK (self); - self->priv->send_stream_start = TRUE; - self->priv->send_segment = TRUE; - gst_segment_init (&self->segment, GST_FORMAT_TIME); - self->priv->first_buffer = TRUE; - GST_OBJECT_UNLOCK (self); -} - -static inline void -gst_aggregator_push_mandatory_events (GstAggregator * self) -{ - GstAggregatorPrivate *priv = self->priv; - GstEvent *segment = NULL; - GstEvent *tags = NULL; - - if (self->priv->send_stream_start) { - gchar s_id[32]; - - GST_INFO_OBJECT (self, "pushing stream start"); - /* stream-start (FIXME: create id based on input ids) */ - g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ()); - if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) { - GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed"); - } - self->priv->send_stream_start = FALSE; - } - - if (self->priv->srccaps) { - - GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT, - self->priv->srccaps); - if (!gst_pad_push_event (self->srcpad, - gst_event_new_caps (self->priv->srccaps))) { - GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed"); - } - gst_caps_unref (self->priv->srccaps); - self->priv->srccaps = NULL; - } - - GST_OBJECT_LOCK (self); - if (self->priv->send_segment && !self->priv->flush_seeking) { - segment = gst_event_new_segment (&self->segment); - - if (!self->priv->seqnum) - /* This code-path is in preparation to be able to run without a source - * connected. Then we won't have a seq-num from a segment event. */ - self->priv->seqnum = gst_event_get_seqnum (segment); - else - gst_event_set_seqnum (segment, self->priv->seqnum); - self->priv->send_segment = FALSE; - - GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment); - } - - if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) { - tags = gst_event_new_tag (gst_tag_list_ref (priv->tags)); - priv->tags_changed = FALSE; - } - GST_OBJECT_UNLOCK (self); - - if (segment) - gst_pad_push_event (self->srcpad, segment); - if (tags) - gst_pad_push_event (self->srcpad, tags); - -} - -/** - * gst_aggregator_set_src_caps: - * @self: The #GstAggregator - * @caps: The #GstCaps to set on the src pad. - * - * Sets the caps to be used on the src pad. - */ -void -gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps) -{ - GST_PAD_STREAM_LOCK (self->srcpad); - gst_caps_replace (&self->priv->srccaps, caps); - gst_aggregator_push_mandatory_events (self); - GST_PAD_STREAM_UNLOCK (self->srcpad); -} - -/** - * gst_aggregator_finish_buffer: - * @self: The #GstAggregator - * @buffer: (transfer full): the #GstBuffer to push. - * - * This method will push the provided output buffer downstream. If needed, - * mandatory events such as stream-start, caps, and segment events will be - * sent before pushing the buffer. - */ -GstFlowReturn -gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer) -{ - gst_aggregator_push_mandatory_events (self); - - GST_OBJECT_LOCK (self); - if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) { - GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer); - GST_OBJECT_UNLOCK (self); - return gst_pad_push (self->srcpad, buffer); - } else { - GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)", - self->priv->flush_seeking, gst_pad_is_active (self->srcpad)); - GST_OBJECT_UNLOCK (self); - gst_buffer_unref (buffer); - return GST_FLOW_OK; - } -} - -static void -gst_aggregator_push_eos (GstAggregator * self) -{ - GstEvent *event; - gst_aggregator_push_mandatory_events (self); - - event = gst_event_new_eos (); - - GST_OBJECT_LOCK (self); - self->priv->send_eos = FALSE; - gst_event_set_seqnum (event, self->priv->seqnum); - GST_OBJECT_UNLOCK (self); - - gst_pad_push_event (self->srcpad, event); -} - -static GstClockTime -gst_aggregator_get_next_time (GstAggregator * self) -{ - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); - - if (klass->get_next_time) - return klass->get_next_time (self); - - return GST_CLOCK_TIME_NONE; -} - -static gboolean -gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) -{ - GstClockTime latency; - GstClockTime start; - gboolean res; - - *timeout = FALSE; - - SRC_LOCK (self); - - latency = gst_aggregator_get_latency_unlocked (self); - - if (gst_aggregator_check_pads_ready (self)) { - GST_DEBUG_OBJECT (self, "all pads have data"); - SRC_UNLOCK (self); - - return TRUE; - } - - /* Before waiting, check if we're actually still running */ - if (!self->priv->running || !self->priv->send_eos) { - SRC_UNLOCK (self); - - return FALSE; - } - - start = gst_aggregator_get_next_time (self); - - /* If we're not live, or if we use the running time - * of the first buffer as start time, we wait until - * all pads have buffers. - * Otherwise (i.e. if we are live!), we wait on the clock - * and if a pad does not have a buffer in time we ignore - * that pad. - */ - GST_OBJECT_LOCK (self); - if (!GST_CLOCK_TIME_IS_VALID (latency) || - !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) || - !GST_CLOCK_TIME_IS_VALID (start) || - (self->priv->first_buffer - && self->priv->start_time_selection == - GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) { - /* We wake up here when something happened, and below - * then check if we're ready now. If we return FALSE, - * we will be directly called again. - */ - GST_OBJECT_UNLOCK (self); - SRC_WAIT (self); - } else { - GstClockTime base_time, time; - GstClock *clock; - GstClockReturn status; - GstClockTimeDiff jitter; - - GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT, - GST_TIME_ARGS (start)); - - base_time = GST_ELEMENT_CAST (self)->base_time; - clock = gst_object_ref (GST_ELEMENT_CLOCK (self)); - GST_OBJECT_UNLOCK (self); - - time = base_time + start; - time += latency; - - GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %" - GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT - " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")", - GST_TIME_ARGS (time), - GST_TIME_ARGS (base_time), - GST_TIME_ARGS (start), GST_TIME_ARGS (latency), - GST_TIME_ARGS (gst_clock_get_time (clock))); - - self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time); - gst_object_unref (clock); - SRC_UNLOCK (self); - - jitter = 0; - status = gst_clock_id_wait (self->priv->aggregate_id, &jitter); - - SRC_LOCK (self); - if (self->priv->aggregate_id) { - gst_clock_id_unref (self->priv->aggregate_id); - self->priv->aggregate_id = NULL; - } - - GST_DEBUG_OBJECT (self, - "clock returned %d (jitter: %" GST_STIME_FORMAT ")", - status, GST_STIME_ARGS (jitter)); - - /* we timed out */ - if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) { - SRC_UNLOCK (self); - *timeout = TRUE; - return TRUE; - } - } - - res = gst_aggregator_check_pads_ready (self); - SRC_UNLOCK (self); - - return res; -} - -static gboolean -gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, - gpointer user_data) -{ - GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad); - GstAggregator *aggregator = GST_AGGREGATOR_CAST (self); - GstEvent *event = NULL; - GstQuery *query = NULL; - GstAggregatorClass *klass = NULL; - gboolean *processed_event = user_data; - - do { - event = NULL; - query = NULL; - - PAD_LOCK (pad); - if (pad->priv->clipped_buffer == NULL && - !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { - if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data))) - event = gst_event_ref (g_queue_peek_tail (&pad->priv->data)); - if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data))) - query = g_queue_peek_tail (&pad->priv->data); - } - PAD_UNLOCK (pad); - if (event || query) { - gboolean ret; - - if (processed_event) - *processed_event = TRUE; - if (klass == NULL) - klass = GST_AGGREGATOR_GET_CLASS (self); - - if (event) { - GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); - gst_event_ref (event); - ret = klass->sink_event (aggregator, pad, event); - - PAD_LOCK (pad); - if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) - pad->priv->negotiated = ret; - if (g_queue_peek_tail (&pad->priv->data) == event) - gst_event_unref (g_queue_pop_tail (&pad->priv->data)); - gst_event_unref (event); - } else if (query) { - GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query); - ret = klass->sink_query (aggregator, pad, query); - - PAD_LOCK (pad); - if (g_queue_peek_tail (&pad->priv->data) == query) { - GstStructure *s; - - s = gst_query_writable_structure (query); - gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret, - NULL); - g_queue_pop_tail (&pad->priv->data); - } - } - - PAD_BROADCAST_EVENT (pad); - PAD_UNLOCK (pad); - } - } while (event || query); - - return TRUE; -} - -static void -gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, - GstFlowReturn flow_return, gboolean full) -{ - GList *item; - - PAD_LOCK (aggpad); - if (flow_return == GST_FLOW_NOT_LINKED) - aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return); - else - aggpad->priv->flow_return = flow_return; - - item = g_queue_peek_head_link (&aggpad->priv->data); - while (item) { - GList *next = item->next; - - /* In partial flush, we do like the pad, we get rid of non-sticky events - * and EOS/SEGMENT. - */ - if (full || GST_IS_BUFFER (item->data) || - GST_EVENT_TYPE (item->data) == GST_EVENT_EOS || - GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT || - !GST_EVENT_IS_STICKY (item->data)) { - if (!GST_IS_QUERY (item->data)) - gst_mini_object_unref (item->data); - g_queue_delete_link (&aggpad->priv->data, item); - } - item = next; - } - aggpad->priv->num_buffers = 0; - gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL); - - PAD_BROADCAST_EVENT (aggpad); - PAD_UNLOCK (aggpad); -} - -static GstFlowReturn -gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps, - GstCaps ** ret) -{ - *ret = gst_caps_ref (caps); - - return GST_FLOW_OK; -} - -static GstCaps * -gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps) -{ - caps = gst_caps_fixate (caps); - - return caps; -} - -static gboolean -gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps) -{ - return TRUE; -} - - -/* takes ownership of the pool, allocator and query */ -static gboolean -gst_aggregator_set_allocation (GstAggregator * self, - GstBufferPool * pool, GstAllocator * allocator, - GstAllocationParams * params, GstQuery * query) -{ - GstAllocator *oldalloc; - GstBufferPool *oldpool; - GstQuery *oldquery; - - GST_DEBUG ("storing allocation query"); - - GST_OBJECT_LOCK (self); - oldpool = self->priv->pool; - self->priv->pool = pool; - - oldalloc = self->priv->allocator; - self->priv->allocator = allocator; - - oldquery = self->priv->allocation_query; - self->priv->allocation_query = query; - - if (params) - self->priv->allocation_params = *params; - else - gst_allocation_params_init (&self->priv->allocation_params); - GST_OBJECT_UNLOCK (self); - - if (oldpool) { - GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool); - gst_buffer_pool_set_active (oldpool, FALSE); - gst_object_unref (oldpool); - } - if (oldalloc) { - gst_object_unref (oldalloc); - } - if (oldquery) { - gst_query_unref (oldquery); - } - return TRUE; -} - - -static gboolean -gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query) -{ - GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self); - - if (aggclass->decide_allocation) - if (!aggclass->decide_allocation (self, query)) - return FALSE; - - return TRUE; -} - -static gboolean -gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps) -{ - GstQuery *query; - gboolean result = TRUE; - GstBufferPool *pool = NULL; - GstAllocator *allocator; - GstAllocationParams params; - - /* find a pool for the negotiated caps now */ - GST_DEBUG_OBJECT (self, "doing allocation query"); - query = gst_query_new_allocation (caps, TRUE); - if (!gst_pad_peer_query (self->srcpad, query)) { - /* not a problem, just debug a little */ - GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed"); - } - - GST_DEBUG_OBJECT (self, "calling decide_allocation"); - result = gst_aggregator_decide_allocation (self, query); - - GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result, - query); - - if (!result) - goto no_decide_allocation; - - /* we got configuration from our peer or the decide_allocation method, - * parse them */ - if (gst_query_get_n_allocation_params (query) > 0) { - gst_query_parse_nth_allocation_param (query, 0, &allocator, ¶ms); - } else { - allocator = NULL; - gst_allocation_params_init (¶ms); - } - - if (gst_query_get_n_allocation_pools (query) > 0) - gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL); - - /* now store */ - result = - gst_aggregator_set_allocation (self, pool, allocator, ¶ms, query); - - return result; - - /* Errors */ -no_decide_allocation: - { - GST_WARNING_OBJECT (self, "Failed to decide allocation"); - gst_query_unref (query); - - return result; - } - -} - -/* WITH SRC_LOCK held */ -static GstFlowReturn -gst_aggregator_update_src_caps (GstAggregator * self) -{ - GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self); - GstCaps *downstream_caps, *template_caps, *caps = NULL; - GstFlowReturn ret = GST_FLOW_OK; - - template_caps = gst_pad_get_pad_template_caps (self->srcpad); - downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps); - - if (gst_caps_is_empty (downstream_caps)) { - GST_INFO_OBJECT (self, "Downstream caps (%" - GST_PTR_FORMAT ") not compatible with pad template caps (%" - GST_PTR_FORMAT ")", downstream_caps, template_caps); - ret = GST_FLOW_NOT_NEGOTIATED; - goto done; - } - - g_assert (agg_klass->update_src_caps); - GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT, - downstream_caps); - ret = agg_klass->update_src_caps (self, downstream_caps, &caps); - if (ret < GST_FLOW_OK) { - GST_WARNING_OBJECT (self, "Subclass failed to update provided caps"); - goto done; - } - if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) { - ret = GST_FLOW_NOT_NEGOTIATED; - goto done; - } - GST_DEBUG_OBJECT (self, " to %" GST_PTR_FORMAT, caps); - -#ifdef GST_ENABLE_EXTRA_CHECKS - if (!gst_caps_is_subset (caps, template_caps)) { - GstCaps *intersection; - - GST_ERROR_OBJECT (self, - "update_src_caps returned caps %" GST_PTR_FORMAT - " which are not a real subset of the template caps %" - GST_PTR_FORMAT, caps, template_caps); - g_warning ("%s: update_src_caps returned caps which are not a real " - "subset of the filter caps", GST_ELEMENT_NAME (self)); - - intersection = - gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST); - gst_caps_unref (caps); - caps = intersection; - } -#endif - - if (gst_caps_is_any (caps)) { - goto done; - } - - if (!gst_caps_is_fixed (caps)) { - g_assert (agg_klass->fixate_src_caps); - - GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps); - if (!(caps = agg_klass->fixate_src_caps (self, caps))) { - GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps"); - ret = GST_FLOW_NOT_NEGOTIATED; - goto done; - } - GST_DEBUG_OBJECT (self, " to %" GST_PTR_FORMAT, caps); - } - - if (agg_klass->negotiated_src_caps) { - if (!agg_klass->negotiated_src_caps (self, caps)) { - GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps"); - ret = GST_FLOW_NOT_NEGOTIATED; - goto done; - } - } - - gst_aggregator_set_src_caps (self, caps); - - if (!gst_aggregator_do_allocation (self, caps)) { - GST_WARNING_OBJECT (self, "Allocation negotiation failed"); - ret = GST_FLOW_NOT_NEGOTIATED; - } - -done: - gst_caps_unref (downstream_caps); - gst_caps_unref (template_caps); - - if (caps) - gst_caps_unref (caps); - - return ret; -} - -static void -gst_aggregator_aggregate_func (GstAggregator * self) -{ - GstAggregatorPrivate *priv = self->priv; - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); - gboolean timeout = FALSE; - - if (self->priv->running == FALSE) { - GST_DEBUG_OBJECT (self, "Not running anymore"); - return; - } - - GST_LOG_OBJECT (self, "Checking aggregate"); - while (priv->send_eos && priv->running) { - GstFlowReturn flow_return = GST_FLOW_OK; - gboolean processed_event = FALSE; - - gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), - gst_aggregator_do_events_and_queries, NULL); - - /* Ensure we have buffers ready (either in clipped_buffer or at the head of - * the queue */ - if (!gst_aggregator_wait_and_check (self, &timeout)) - continue; - - gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), - gst_aggregator_do_events_and_queries, &processed_event); - - if (processed_event) - continue; - - if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) { - flow_return = gst_aggregator_update_src_caps (self); - if (flow_return != GST_FLOW_OK) - gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self)); - } - - if (timeout || flow_return >= GST_FLOW_OK) { - GST_TRACE_OBJECT (self, "Actually aggregating!"); - flow_return = klass->aggregate (self, timeout); - } - - if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA) - continue; - - GST_OBJECT_LOCK (self); - if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) { - /* We don't want to set the pads to flushing, but we want to - * stop the thread, so just break here */ - GST_OBJECT_UNLOCK (self); - break; - } - GST_OBJECT_UNLOCK (self); - - if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) { - gst_aggregator_push_eos (self); - } - - GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return)); - - if (flow_return != GST_FLOW_OK) { - GList *item; - - GST_OBJECT_LOCK (self); - for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) { - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); - - gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE); - } - GST_OBJECT_UNLOCK (self); - break; - } - } - - /* Pause the task here, the only ways to get here are: - * 1) We're stopping, in which case the task is stopped anyway - * 2) We got a flow error above, in which case it might take - * some time to forward the flow return upstream and we - * would otherwise call the task function over and over - * again without doing anything - */ - gst_pad_pause_task (self->srcpad); -} - -static gboolean -gst_aggregator_start (GstAggregator * self) -{ - GstAggregatorClass *klass; - gboolean result; - - self->priv->send_stream_start = TRUE; - self->priv->send_segment = TRUE; - self->priv->send_eos = TRUE; - self->priv->srccaps = NULL; - - gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL); - - klass = GST_AGGREGATOR_GET_CLASS (self); - - if (klass->start) - result = klass->start (self); - else - result = TRUE; - - return result; -} - -static gboolean -_check_pending_flush_stop (GstAggregatorPad * pad) -{ - gboolean res; - - PAD_LOCK (pad); - res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start); - PAD_UNLOCK (pad); - - return res; -} - -static gboolean -gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start) -{ - gboolean res = TRUE; - - GST_INFO_OBJECT (self, "%s srcpad task", - flush_start ? "Pausing" : "Stopping"); - - SRC_LOCK (self); - self->priv->running = FALSE; - SRC_BROADCAST (self); - SRC_UNLOCK (self); - - if (flush_start) { - res = gst_pad_push_event (self->srcpad, flush_start); - } - - gst_pad_stop_task (self->srcpad); - - return res; -} - -static void -gst_aggregator_start_srcpad_task (GstAggregator * self) -{ - GST_INFO_OBJECT (self, "Starting srcpad task"); - - self->priv->running = TRUE; - gst_pad_start_task (GST_PAD (self->srcpad), - (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL); -} - -static GstFlowReturn -gst_aggregator_flush (GstAggregator * self) -{ - GstFlowReturn ret = GST_FLOW_OK; - GstAggregatorPrivate *priv = self->priv; - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); - - GST_DEBUG_OBJECT (self, "Flushing everything"); - GST_OBJECT_LOCK (self); - priv->send_segment = TRUE; - priv->flush_seeking = FALSE; - priv->tags_changed = FALSE; - GST_OBJECT_UNLOCK (self); - if (klass->flush) - ret = klass->flush (self); - - return ret; -} - - -/* Called with GstAggregator's object lock held */ - -static gboolean -gst_aggregator_all_flush_stop_received_locked (GstAggregator * self) -{ - GList *tmp; - GstAggregatorPad *tmppad; - - for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) { - tmppad = (GstAggregatorPad *) tmp->data; - - if (_check_pending_flush_stop (tmppad) == FALSE) { - GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i", - tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop); - return FALSE; - } - } - - return TRUE; -} - -static void -gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, - GstEvent * event) -{ - GstAggregatorPrivate *priv = self->priv; - GstAggregatorPadPrivate *padpriv = aggpad->priv; - - gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE); - - PAD_FLUSH_LOCK (aggpad); - PAD_LOCK (aggpad); - if (padpriv->pending_flush_start) { - GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now"); - - padpriv->pending_flush_start = FALSE; - padpriv->pending_flush_stop = TRUE; - } - PAD_UNLOCK (aggpad); - - GST_OBJECT_LOCK (self); - if (priv->flush_seeking) { - /* If flush_seeking we forward the first FLUSH_START */ - if (priv->pending_flush_start) { - priv->pending_flush_start = FALSE; - GST_OBJECT_UNLOCK (self); - - GST_INFO_OBJECT (self, "Flushing, pausing srcpad task"); - gst_aggregator_stop_srcpad_task (self, event); - - GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking"); - GST_PAD_STREAM_LOCK (self->srcpad); - GST_LOG_OBJECT (self, "GOT STREAM_LOCK"); - event = NULL; - } else { - GST_OBJECT_UNLOCK (self); - gst_event_unref (event); - } - } else { - GST_OBJECT_UNLOCK (self); - gst_event_unref (event); - } - PAD_FLUSH_UNLOCK (aggpad); -} - -/* Must be called with the the PAD_LOCK held */ -static void -update_time_level (GstAggregatorPad * aggpad, gboolean head) -{ - GstAggregatorPadPrivate *priv = aggpad->priv; - - if (head) { - if (GST_CLOCK_TIME_IS_VALID (priv->head_position) && - priv->head_segment.format == GST_FORMAT_TIME) - priv->head_time = gst_segment_to_running_time (&priv->head_segment, - GST_FORMAT_TIME, priv->head_position); - else - priv->head_time = GST_CLOCK_TIME_NONE; - - if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time)) - priv->tail_time = priv->head_time; - } else { - if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) && - aggpad->segment.format == GST_FORMAT_TIME) - priv->tail_time = gst_segment_to_running_time (&aggpad->segment, - GST_FORMAT_TIME, priv->tail_position); - else - priv->tail_time = priv->head_time; - } - - if (priv->head_time == GST_CLOCK_TIME_NONE || - priv->tail_time == GST_CLOCK_TIME_NONE) { - priv->time_level = 0; - return; - } - - if (priv->tail_time > priv->head_time) - priv->time_level = 0; - else - priv->time_level = priv->head_time - priv->tail_time; -} - - -/* GstAggregator vmethods default implementations */ -static gboolean -gst_aggregator_default_sink_event (GstAggregator * self, - GstAggregatorPad * aggpad, GstEvent * event) -{ - gboolean res = TRUE; - GstPad *pad = GST_PAD (aggpad); - GstAggregatorPrivate *priv = self->priv; - - GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event); - - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_FLUSH_START: - { - gst_aggregator_flush_start (self, aggpad, event); - /* We forward only in one case: right after flush_seeking */ - event = NULL; - goto eat; - } - case GST_EVENT_FLUSH_STOP: - { - gst_aggregator_pad_flush (aggpad, self); - GST_OBJECT_LOCK (self); - if (priv->flush_seeking) { - g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE); - if (gst_aggregator_all_flush_stop_received_locked (self)) { - GST_OBJECT_UNLOCK (self); - /* That means we received FLUSH_STOP/FLUSH_STOP on - * all sinkpads -- Seeking is Done... sending FLUSH_STOP */ - gst_aggregator_flush (self); - gst_pad_push_event (self->srcpad, event); - event = NULL; - SRC_LOCK (self); - priv->send_eos = TRUE; - SRC_BROADCAST (self); - SRC_UNLOCK (self); - - GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK"); - GST_PAD_STREAM_UNLOCK (self->srcpad); - gst_aggregator_start_srcpad_task (self); - } else { - GST_OBJECT_UNLOCK (self); - } - } else { - GST_OBJECT_UNLOCK (self); - } - - /* We never forward the event */ - goto eat; - } - case GST_EVENT_EOS: - { - SRC_LOCK (self); - PAD_LOCK (aggpad); - g_assert (aggpad->priv->num_buffers == 0); - aggpad->priv->eos = TRUE; - PAD_UNLOCK (aggpad); - SRC_BROADCAST (self); - SRC_UNLOCK (self); - goto eat; - } - case GST_EVENT_SEGMENT: - { - PAD_LOCK (aggpad); - GST_OBJECT_LOCK (aggpad); - gst_event_copy_segment (event, &aggpad->segment); - /* We've got a new segment, tail_position is now meaningless - * and may interfere with the time_level calculation - */ - aggpad->priv->tail_position = GST_CLOCK_TIME_NONE; - update_time_level (aggpad, FALSE); - GST_OBJECT_UNLOCK (aggpad); - PAD_UNLOCK (aggpad); - - GST_OBJECT_LOCK (self); - self->priv->seqnum = gst_event_get_seqnum (event); - GST_OBJECT_UNLOCK (self); - goto eat; - } - case GST_EVENT_STREAM_START: - { - goto eat; - } - case GST_EVENT_GAP: - { - GstClockTime pts, endpts; - GstClockTime duration; - GstBuffer *gapbuf; - - gst_event_parse_gap (event, &pts, &duration); - gapbuf = gst_buffer_new (); - - if (GST_CLOCK_TIME_IS_VALID (duration)) - endpts = pts + duration; - else - endpts = GST_CLOCK_TIME_NONE; - - GST_OBJECT_LOCK (aggpad); - res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts, - &pts, &endpts); - GST_OBJECT_UNLOCK (aggpad); - - if (!res) { - GST_WARNING_OBJECT (self, "GAP event outside segment, dropping"); - goto eat; - } - - if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts)) - duration = endpts - pts; - else - duration = GST_CLOCK_TIME_NONE; - - GST_BUFFER_PTS (gapbuf) = pts; - GST_BUFFER_DURATION (gapbuf) = duration; - GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP); - GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE); - - /* Remove GAP event so we can replace it with the buffer */ - if (g_queue_peek_tail (&aggpad->priv->data) == event) - gst_event_unref (g_queue_pop_tail (&aggpad->priv->data)); - - if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) != - GST_FLOW_OK) { - GST_WARNING_OBJECT (self, "Failed to chain gap buffer"); - res = FALSE; - } - - goto eat; - } - case GST_EVENT_TAG: - { - GstTagList *tags; - - gst_event_parse_tag (event, &tags); - - if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) { - gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE); - gst_event_unref (event); - event = NULL; - goto eat; - } - break; - } - default: - { - break; - } - } - - GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event); - return gst_pad_event_default (pad, GST_OBJECT (self), event); - -eat: - GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event); - if (event) - gst_event_unref (event); - - return res; -} - -static gboolean -gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data) -{ - GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad); - GstAggregator *agg = GST_AGGREGATOR_CAST (self); - - gst_aggregator_pad_flush (pad, agg); - - PAD_LOCK (pad); - pad->priv->flow_return = GST_FLOW_FLUSHING; - pad->priv->negotiated = FALSE; - PAD_BROADCAST_EVENT (pad); - PAD_UNLOCK (pad); - - return TRUE; -} - -static gboolean -gst_aggregator_stop (GstAggregator * agg) -{ - GstAggregatorClass *klass; - gboolean result; - - gst_aggregator_reset_flow_values (agg); - - /* Application needs to make sure no pads are added while it shuts us down */ - gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg), - gst_aggregator_stop_pad, NULL); - - klass = GST_AGGREGATOR_GET_CLASS (agg); - - if (klass->stop) - result = klass->stop (agg); - else - result = TRUE; - - agg->priv->has_peer_latency = FALSE; - agg->priv->peer_latency_live = FALSE; - agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0; - - if (agg->priv->tags) - gst_tag_list_unref (agg->priv->tags); - agg->priv->tags = NULL; - - gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL); - - return result; -} - -/* GstElement vmethods implementations */ -static GstStateChangeReturn -gst_aggregator_change_state (GstElement * element, GstStateChange transition) -{ - GstStateChangeReturn ret; - GstAggregator *self = GST_AGGREGATOR (element); - - switch (transition) { - case GST_STATE_CHANGE_READY_TO_PAUSED: - if (!gst_aggregator_start (self)) - goto error_start; - break; - default: - break; - } - - if ((ret = - GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element, - transition)) == GST_STATE_CHANGE_FAILURE) - goto failure; - - - switch (transition) { - case GST_STATE_CHANGE_PAUSED_TO_READY: - if (!gst_aggregator_stop (self)) { - /* What to do in this case? Error out? */ - GST_ERROR_OBJECT (self, "Subclass failed to stop."); - } - break; - default: - break; - } - - return ret; - -/* ERRORS */ -failure: - { - GST_ERROR_OBJECT (element, "parent failed state change"); - return ret; - } -error_start: - { - GST_ERROR_OBJECT (element, "Subclass failed to start"); - return GST_STATE_CHANGE_FAILURE; - } -} - -static void -gst_aggregator_release_pad (GstElement * element, GstPad * pad) -{ - GstAggregator *self = GST_AGGREGATOR (element); - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - - GST_INFO_OBJECT (pad, "Removing pad"); - - SRC_LOCK (self); - gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE); - gst_element_remove_pad (element, pad); - - self->priv->has_peer_latency = FALSE; - SRC_BROADCAST (self); - SRC_UNLOCK (self); -} - -static GstAggregatorPad * -gst_aggregator_default_create_new_pad (GstAggregator * self, - GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps) -{ - GstAggregatorPad *agg_pad; - GstAggregatorPrivate *priv = self->priv; - gint serial = 0; - gchar *name = NULL; - GType pad_type = - GST_PAD_TEMPLATE_GTYPE (templ) == - G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ); - - if (templ->direction != GST_PAD_SINK) - goto not_sink; - - if (templ->presence != GST_PAD_REQUEST) - goto not_request; - - GST_OBJECT_LOCK (self); - if (req_name == NULL || strlen (req_name) < 6 - || !g_str_has_prefix (req_name, "sink_")) { - /* no name given when requesting the pad, use next available int */ - serial = ++priv->max_padserial; - } else { - /* parse serial number from requested padname */ - serial = g_ascii_strtoull (&req_name[5], NULL, 10); - if (serial > priv->max_padserial) - priv->max_padserial = serial; - } - - name = g_strdup_printf ("sink_%u", serial); - agg_pad = g_object_new (pad_type, - "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); - g_free (name); - - GST_OBJECT_UNLOCK (self); - - return agg_pad; - - /* errors */ -not_sink: - { - GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad"); - return NULL; - } -not_request: - { - GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad"); - return NULL; - } -} - -static GstPad * -gst_aggregator_request_new_pad (GstElement * element, - GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps) -{ - GstAggregator *self; - GstAggregatorPad *agg_pad; - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element); - GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv; - - self = GST_AGGREGATOR (element); - - agg_pad = klass->create_new_pad (self, templ, req_name, caps); - if (!agg_pad) { - GST_ERROR_OBJECT (element, "Couldn't create new pad"); - return NULL; - } - - GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad)); - - if (priv->running) - gst_pad_set_active (GST_PAD (agg_pad), TRUE); - - /* add the pad to the element */ - gst_element_add_pad (element, GST_PAD (agg_pad)); - - return GST_PAD (agg_pad); -} - -/* Must be called with SRC_LOCK held */ - -static gboolean -gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) -{ - gboolean query_ret, live; - GstClockTime our_latency, min, max; - - query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query); - - if (!query_ret) { - GST_WARNING_OBJECT (self, "Latency query failed"); - return FALSE; - } - - gst_query_parse_latency (query, &live, &min, &max); - - if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) { - GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT - ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min)); - return FALSE; - } - - if (min > max && GST_CLOCK_TIME_IS_VALID (max)) { - GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL), - ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %" - GST_TIME_FORMAT ". Add queues or other buffering elements.", - GST_TIME_ARGS (max), GST_TIME_ARGS (min))); - return FALSE; - } - - our_latency = self->priv->latency; - - self->priv->peer_latency_live = live; - self->priv->peer_latency_min = min; - self->priv->peer_latency_max = max; - self->priv->has_peer_latency = TRUE; - - /* add our own */ - min += our_latency; - min += self->priv->sub_latency_min; - if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max) - && GST_CLOCK_TIME_IS_VALID (max)) - max += self->priv->sub_latency_max + our_latency; - else - max = GST_CLOCK_TIME_NONE; - - SRC_BROADCAST (self); - - GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT - " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max); - - gst_query_set_latency (query, live, min, max); - - return query_ret; -} - -/* - * MUST be called with the src_lock held. - * - * See gst_aggregator_get_latency() for doc - */ -static GstClockTime -gst_aggregator_get_latency_unlocked (GstAggregator * self) -{ - GstClockTime latency; - - g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0); - - if (!self->priv->has_peer_latency) { - GstQuery *query = gst_query_new_latency (); - gboolean ret; - - ret = gst_aggregator_query_latency_unlocked (self, query); - gst_query_unref (query); - if (!ret) - return GST_CLOCK_TIME_NONE; - } - - if (!self->priv->has_peer_latency || !self->priv->peer_latency_live) - return GST_CLOCK_TIME_NONE; - - /* latency_min is never GST_CLOCK_TIME_NONE by construction */ - latency = self->priv->peer_latency_min; - - /* add our own */ - latency += self->priv->latency; - latency += self->priv->sub_latency_min; - - return latency; -} - -/** - * gst_aggregator_get_latency: - * @self: a #GstAggregator - * - * Retrieves the latency values reported by @self in response to the latency - * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element - * will not wait for the clock. - * - * Typically only called by subclasses. - * - * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync - */ -GstClockTime -gst_aggregator_get_latency (GstAggregator * self) -{ - GstClockTime ret; - - SRC_LOCK (self); - ret = gst_aggregator_get_latency_unlocked (self); - SRC_UNLOCK (self); - - return ret; -} - -static gboolean -gst_aggregator_send_event (GstElement * element, GstEvent * event) -{ - GstAggregator *self = GST_AGGREGATOR (element); - - GST_STATE_LOCK (element); - if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK && - GST_STATE (element) < GST_STATE_PAUSED) { - gdouble rate; - GstFormat fmt; - GstSeekFlags flags; - GstSeekType start_type, stop_type; - gint64 start, stop; - - gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type, - &start, &stop_type, &stop); - - GST_OBJECT_LOCK (self); - gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start, - stop_type, stop, NULL); - self->priv->seqnum = gst_event_get_seqnum (event); - self->priv->first_buffer = FALSE; - GST_OBJECT_UNLOCK (self); - - GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event); - } - GST_STATE_UNLOCK (element); - - - return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element, - event); -} - -static gboolean -gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query) -{ - gboolean res = TRUE; - - switch (GST_QUERY_TYPE (query)) { - case GST_QUERY_SEEKING: - { - GstFormat format; - - /* don't pass it along as some (file)sink might claim it does - * whereas with a collectpads in between that will not likely work */ - gst_query_parse_seeking (query, &format, NULL, NULL, NULL); - gst_query_set_seeking (query, format, FALSE, 0, -1); - res = TRUE; - - break; - } - case GST_QUERY_LATENCY: - SRC_LOCK (self); - res = gst_aggregator_query_latency_unlocked (self, query); - SRC_UNLOCK (self); - break; - default: - return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query); - } - - return res; -} - -static gboolean -gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) -{ - EventData *evdata = user_data; - gboolean ret = TRUE; - GstPad *peer = gst_pad_get_peer (pad); - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - - if (peer) { - if (evdata->only_to_active_pads && aggpad->priv->first_buffer) { - GST_DEBUG_OBJECT (pad, "not sending event to inactive pad"); - ret = TRUE; - } else { - ret = gst_pad_send_event (peer, gst_event_ref (evdata->event)); - GST_DEBUG_OBJECT (pad, "return of event push is %d", ret); - gst_object_unref (peer); - } - } - - if (ret == FALSE) { - if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) { - GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME); - - GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event); - - if (gst_pad_query (peer, seeking)) { - gboolean seekable; - - gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL); - - if (seekable == FALSE) { - GST_INFO_OBJECT (pad, - "Source not seekable, We failed but it does not matter!"); - - ret = TRUE; - } - } else { - GST_ERROR_OBJECT (pad, "Query seeking FAILED"); - } - - gst_query_unref (seeking); - } - - if (evdata->flush) { - PAD_LOCK (aggpad); - aggpad->priv->pending_flush_start = FALSE; - aggpad->priv->pending_flush_stop = FALSE; - PAD_UNLOCK (aggpad); - } - } else { - evdata->one_actually_seeked = TRUE; - } - - evdata->result &= ret; - - /* Always send to all pads */ - return FALSE; -} - -static void -gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self, - EventData * evdata) -{ - evdata->result = TRUE; - evdata->one_actually_seeked = FALSE; - - /* We first need to set all pads as flushing in a first pass - * as flush_start flush_stop is sometimes sent synchronously - * while we send the seek event */ - if (evdata->flush) { - GList *l; - - GST_OBJECT_LOCK (self); - for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) { - GstAggregatorPad *pad = l->data; - - PAD_LOCK (pad); - pad->priv->pending_flush_start = TRUE; - pad->priv->pending_flush_stop = FALSE; - PAD_UNLOCK (pad); - } - GST_OBJECT_UNLOCK (self); - } - - gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata); - - gst_event_unref (evdata->event); -} - -static gboolean -gst_aggregator_do_seek (GstAggregator * self, GstEvent * event) -{ - gdouble rate; - GstFormat fmt; - GstSeekFlags flags; - GstSeekType start_type, stop_type; - gint64 start, stop; - gboolean flush; - EventData evdata = { 0, }; - GstAggregatorPrivate *priv = self->priv; - - gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type, - &start, &stop_type, &stop); - - GST_INFO_OBJECT (self, "starting SEEK"); - - flush = flags & GST_SEEK_FLAG_FLUSH; - - GST_OBJECT_LOCK (self); - if (flush) { - priv->pending_flush_start = TRUE; - priv->flush_seeking = TRUE; - } - - gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start, - stop_type, stop, NULL); - - /* Seeking sets a position */ - self->priv->first_buffer = FALSE; - GST_OBJECT_UNLOCK (self); - - /* forward the seek upstream */ - evdata.event = event; - evdata.flush = flush; - evdata.only_to_active_pads = FALSE; - gst_aggregator_forward_event_to_all_sinkpads (self, &evdata); - event = NULL; - - if (!evdata.result || !evdata.one_actually_seeked) { - GST_OBJECT_LOCK (self); - priv->flush_seeking = FALSE; - priv->pending_flush_start = FALSE; - GST_OBJECT_UNLOCK (self); - } - - GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result); - - return evdata.result; -} - -static gboolean -gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event) -{ - EventData evdata = { 0, }; - - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_SEEK: - /* _do_seek() unrefs the event. */ - return gst_aggregator_do_seek (self, event); - case GST_EVENT_NAVIGATION: - /* navigation is rather pointless. */ - gst_event_unref (event); - return FALSE; - default: - break; - } - - /* Don't forward QOS events to pads that had no active buffer yet. Otherwise - * they will receive a QOS event that has earliest_time=0 (because we can't - * have negative timestamps), and consider their buffer as too late */ - evdata.event = event; - evdata.flush = FALSE; - evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS; - gst_aggregator_forward_event_to_all_sinkpads (self, &evdata); - return evdata.result; -} - -static gboolean -gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent, - GstEvent * event) -{ - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - - return klass->src_event (GST_AGGREGATOR (parent), event); -} - -static gboolean -gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent, - GstQuery * query) -{ - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - - return klass->src_query (GST_AGGREGATOR (parent), query); -} - -static gboolean -gst_aggregator_src_pad_activate_mode_func (GstPad * pad, - GstObject * parent, GstPadMode mode, gboolean active) -{ - GstAggregator *self = GST_AGGREGATOR (parent); - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - - if (klass->src_activate) { - if (klass->src_activate (self, mode, active) == FALSE) { - return FALSE; - } - } - - if (active == TRUE) { - switch (mode) { - case GST_PAD_MODE_PUSH: - { - GST_INFO_OBJECT (pad, "Activating pad!"); - gst_aggregator_start_srcpad_task (self); - return TRUE; - } - default: - { - GST_ERROR_OBJECT (pad, "Only supported mode is PUSH"); - return FALSE; - } - } - } - - /* deactivating */ - GST_INFO_OBJECT (self, "Deactivating srcpad"); - gst_aggregator_stop_srcpad_task (self, FALSE); - - return TRUE; -} - -static gboolean -gst_aggregator_default_sink_query (GstAggregator * self, - GstAggregatorPad * aggpad, GstQuery * query) -{ - GstPad *pad = GST_PAD (aggpad); - - if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) { - GstQuery *decide_query = NULL; - GstAggregatorClass *agg_class; - gboolean ret; - - GST_OBJECT_LOCK (self); - PAD_LOCK (aggpad); - if (G_UNLIKELY (!aggpad->priv->negotiated)) { - GST_DEBUG_OBJECT (self, - "not negotiated yet, can't answer ALLOCATION query"); - PAD_UNLOCK (aggpad); - GST_OBJECT_UNLOCK (self); - - return FALSE; - } - - if ((decide_query = self->priv->allocation_query)) - gst_query_ref (decide_query); - PAD_UNLOCK (aggpad); - GST_OBJECT_UNLOCK (self); - - GST_DEBUG_OBJECT (self, - "calling propose allocation with query %" GST_PTR_FORMAT, decide_query); - - agg_class = GST_AGGREGATOR_GET_CLASS (self); - - /* pass the query to the propose_allocation vmethod if any */ - if (agg_class->propose_allocation) - ret = agg_class->propose_allocation (self, aggpad, decide_query, query); - else - ret = FALSE; - - if (decide_query) - gst_query_unref (decide_query); - - GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query); - return ret; - } - - return gst_pad_query_default (pad, GST_OBJECT (self), query); -} - -static void -gst_aggregator_finalize (GObject * object) -{ - GstAggregator *self = (GstAggregator *) object; - - g_mutex_clear (&self->priv->src_lock); - g_cond_clear (&self->priv->src_cond); - - G_OBJECT_CLASS (aggregator_parent_class)->finalize (object); -} - -/* - * gst_aggregator_set_latency_property: - * @agg: a #GstAggregator - * @latency: the new latency value (in nanoseconds). - * - * Sets the new latency value to @latency. This value is used to limit the - * amount of time a pad waits for data to appear before considering the pad - * as unresponsive. - */ -static void -gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency) -{ - gboolean changed; - - g_return_if_fail (GST_IS_AGGREGATOR (self)); - g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency)); - - SRC_LOCK (self); - changed = (self->priv->latency != latency); - - if (changed) { - GList *item; - - GST_OBJECT_LOCK (self); - /* First lock all the pads */ - for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) { - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); - PAD_LOCK (aggpad); - } - - self->priv->latency = latency; - - SRC_BROADCAST (self); - - /* Now wake up the pads */ - for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) { - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); - PAD_BROADCAST_EVENT (aggpad); - PAD_UNLOCK (aggpad); - } - GST_OBJECT_UNLOCK (self); - } - - SRC_UNLOCK (self); - - if (changed) - gst_element_post_message (GST_ELEMENT_CAST (self), - gst_message_new_latency (GST_OBJECT_CAST (self))); -} - -/* - * gst_aggregator_get_latency_property: - * @agg: a #GstAggregator - * - * Gets the latency value. See gst_aggregator_set_latency for - * more details. - * - * Returns: The time in nanoseconds to wait for data to arrive on a sink pad - * before a pad is deemed unresponsive. A value of -1 means an - * unlimited time. - */ -static GstClockTime -gst_aggregator_get_latency_property (GstAggregator * agg) -{ - GstClockTime res; - - g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE); - - GST_OBJECT_LOCK (agg); - res = agg->priv->latency; - GST_OBJECT_UNLOCK (agg); - - return res; -} - -static void -gst_aggregator_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec) -{ - GstAggregator *agg = GST_AGGREGATOR (object); - - switch (prop_id) { - case PROP_LATENCY: - gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value)); - break; - case PROP_START_TIME_SELECTION: - agg->priv->start_time_selection = g_value_get_enum (value); - break; - case PROP_START_TIME: - agg->priv->start_time = g_value_get_uint64 (value); - break; - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -static void -gst_aggregator_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec) -{ - GstAggregator *agg = GST_AGGREGATOR (object); - - switch (prop_id) { - case PROP_LATENCY: - g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg)); - break; - case PROP_START_TIME_SELECTION: - g_value_set_enum (value, agg->priv->start_time_selection); - break; - case PROP_START_TIME: - g_value_set_uint64 (value, agg->priv->start_time); - break; - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -/* GObject vmethods implementations */ -static void -gst_aggregator_class_init (GstAggregatorClass * klass) -{ - GObjectClass *gobject_class = (GObjectClass *) klass; - GstElementClass *gstelement_class = (GstElementClass *) klass; - - aggregator_parent_class = g_type_class_peek_parent (klass); - g_type_class_add_private (klass, sizeof (GstAggregatorPrivate)); - - GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator", - GST_DEBUG_FG_MAGENTA, "GstAggregator"); - - klass->sink_event = gst_aggregator_default_sink_event; - klass->sink_query = gst_aggregator_default_sink_query; - - klass->src_event = gst_aggregator_default_src_event; - klass->src_query = gst_aggregator_default_src_query; - - klass->create_new_pad = gst_aggregator_default_create_new_pad; - klass->update_src_caps = gst_aggregator_default_update_src_caps; - klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps; - klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps; - - gstelement_class->request_new_pad = - GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad); - gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event); - gstelement_class->release_pad = - GST_DEBUG_FUNCPTR (gst_aggregator_release_pad); - gstelement_class->change_state = - GST_DEBUG_FUNCPTR (gst_aggregator_change_state); - - gobject_class->set_property = gst_aggregator_set_property; - gobject_class->get_property = gst_aggregator_get_property; - gobject_class->finalize = gst_aggregator_finalize; - - g_object_class_install_property (gobject_class, PROP_LATENCY, - g_param_spec_uint64 ("latency", "Buffer latency", - "Additional latency in live mode to allow upstream " - "to take longer to produce buffers for the current " - "position (in nanoseconds)", 0, G_MAXUINT64, - DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION, - g_param_spec_enum ("start-time-selection", "Start Time Selection", - "Decides which start time is output", - gst_aggregator_start_time_selection_get_type (), - DEFAULT_START_TIME_SELECTION, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_START_TIME, - g_param_spec_uint64 ("start-time", "Start Time", - "Start time to use if start-time-selection=set", 0, - G_MAXUINT64, - DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries); -} - -static void -gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) -{ - GstPadTemplate *pad_template; - GstAggregatorPrivate *priv; - - g_return_if_fail (klass->aggregate != NULL); - - self->priv = - G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR, - GstAggregatorPrivate); - - priv = self->priv; - - pad_template = - gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src"); - g_return_if_fail (pad_template != NULL); - - priv->max_padserial = -1; - priv->tags_changed = FALSE; - - self->priv->peer_latency_live = FALSE; - self->priv->peer_latency_min = self->priv->sub_latency_min = 0; - self->priv->peer_latency_max = self->priv->sub_latency_max = 0; - self->priv->has_peer_latency = FALSE; - gst_aggregator_reset_flow_values (self); - - self->srcpad = gst_pad_new_from_template (pad_template, "src"); - - gst_pad_set_event_function (self->srcpad, - GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func)); - gst_pad_set_query_function (self->srcpad, - GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func)); - gst_pad_set_activatemode_function (self->srcpad, - GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func)); - - gst_element_add_pad (GST_ELEMENT (self), self->srcpad); - - self->priv->latency = DEFAULT_LATENCY; - self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION; - self->priv->start_time = DEFAULT_START_TIME; - - g_mutex_init (&self->priv->src_lock); - g_cond_init (&self->priv->src_cond); -} - -/* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init - * method to get to the padtemplates */ -GType -gst_aggregator_get_type (void) -{ - static volatile gsize type = 0; - - if (g_once_init_enter (&type)) { - GType _type; - static const GTypeInfo info = { - sizeof (GstAggregatorClass), - NULL, - NULL, - (GClassInitFunc) gst_aggregator_class_init, - NULL, - NULL, - sizeof (GstAggregator), - 0, - (GInstanceInitFunc) gst_aggregator_init, - }; - - _type = g_type_register_static (GST_TYPE_ELEMENT, - "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT); - g_once_init_leave (&type, _type); - } - return type; -} - -/* Must be called with SRC lock and PAD lock held */ -static gboolean -gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad) -{ - /* Empty queue always has space */ - if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL) - return TRUE; - - /* We also want at least two buffers, one is being processed and one is ready - * for the next iteration when we operate in live mode. */ - if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2) - return TRUE; - - /* zero latency, if there is a buffer, it's full */ - if (self->priv->latency == 0) - return FALSE; - - /* Allow no more buffers than the latency */ - return (aggpad->priv->time_level <= self->priv->latency); -} - -/* Must be called with the PAD_LOCK held */ -static void -apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) -{ - GstClockTime timestamp; - - if (GST_BUFFER_DTS_IS_VALID (buffer)) - timestamp = GST_BUFFER_DTS (buffer); - else - timestamp = GST_BUFFER_PTS (buffer); - - if (timestamp == GST_CLOCK_TIME_NONE) { - if (head) - timestamp = aggpad->priv->head_position; - else - timestamp = aggpad->priv->tail_position; - } - - /* add duration */ - if (GST_BUFFER_DURATION_IS_VALID (buffer)) - timestamp += GST_BUFFER_DURATION (buffer); - - if (head) - aggpad->priv->head_position = timestamp; - else - aggpad->priv->tail_position = timestamp; - - update_time_level (aggpad, head); -} - -/* - * Can be called either from the sinkpad's chain function or from the srcpad's - * thread in the case of a buffer synthetized from a GAP event. - * Because of this second case, FLUSH_LOCK can't be used here. - */ - -static GstFlowReturn -gst_aggregator_pad_chain_internal (GstAggregator * self, - GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) -{ - GstFlowReturn flow_return; - GstClockTime buf_pts; - - GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); - - PAD_LOCK (aggpad); - flow_return = aggpad->priv->flow_return; - if (flow_return != GST_FLOW_OK) - goto flushing; - - PAD_UNLOCK (aggpad); - - buf_pts = GST_BUFFER_PTS (buffer); - - for (;;) { - SRC_LOCK (self); - GST_OBJECT_LOCK (self); - PAD_LOCK (aggpad); - - if (aggpad->priv->first_buffer) { - self->priv->has_peer_latency = FALSE; - aggpad->priv->first_buffer = FALSE; - } - - if ((gst_aggregator_pad_has_space (self, aggpad) || !head) - && aggpad->priv->flow_return == GST_FLOW_OK) { - if (head) - g_queue_push_head (&aggpad->priv->data, buffer); - else - g_queue_push_tail (&aggpad->priv->data, buffer); - apply_buffer (aggpad, buffer, head); - aggpad->priv->num_buffers++; - buffer = NULL; - SRC_BROADCAST (self); - break; - } - - flow_return = aggpad->priv->flow_return; - if (flow_return != GST_FLOW_OK) { - GST_OBJECT_UNLOCK (self); - SRC_UNLOCK (self); - goto flushing; - } - GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); - GST_OBJECT_UNLOCK (self); - SRC_UNLOCK (self); - PAD_WAIT_EVENT (aggpad); - - PAD_UNLOCK (aggpad); - } - - if (self->priv->first_buffer) { - GstClockTime start_time; - - switch (self->priv->start_time_selection) { - case GST_AGGREGATOR_START_TIME_SELECTION_ZERO: - default: - start_time = 0; - break; - case GST_AGGREGATOR_START_TIME_SELECTION_FIRST: - GST_OBJECT_LOCK (aggpad); - if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) { - start_time = buf_pts; - if (start_time != -1) { - start_time = MAX (start_time, aggpad->priv->head_segment.start); - start_time = - gst_segment_to_running_time (&aggpad->priv->head_segment, - GST_FORMAT_TIME, start_time); - } - } else { - start_time = 0; - GST_WARNING_OBJECT (aggpad, - "Ignoring request of selecting the first start time " - "as the segment is a %s segment instead of a time segment", - gst_format_get_name (aggpad->segment.format)); - } - GST_OBJECT_UNLOCK (aggpad); - break; - case GST_AGGREGATOR_START_TIME_SELECTION_SET: - start_time = self->priv->start_time; - if (start_time == -1) - start_time = 0; - break; - } - - if (start_time != -1) { - if (self->segment.position == -1) - self->segment.position = start_time; - else - self->segment.position = MIN (start_time, self->segment.position); - - GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT, - GST_TIME_ARGS (start_time)); - } - } - - PAD_UNLOCK (aggpad); - GST_OBJECT_UNLOCK (self); - SRC_UNLOCK (self); - - GST_DEBUG_OBJECT (aggpad, "Done chaining"); - - return flow_return; - -flushing: - PAD_UNLOCK (aggpad); - - GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer", - gst_flow_get_name (flow_return)); - if (buffer) - gst_buffer_unref (buffer); - - return flow_return; -} - -static GstFlowReturn -gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) -{ - GstFlowReturn ret; - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - - PAD_FLUSH_LOCK (aggpad); - - ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object), - aggpad, buffer, TRUE); - - PAD_FLUSH_UNLOCK (aggpad); - - return ret; -} - -static gboolean -gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, - GstQuery * query) -{ - GstAggregator *self = GST_AGGREGATOR (parent); - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - - if (GST_QUERY_IS_SERIALIZED (query)) { - GstStructure *s; - gboolean ret = FALSE; - - SRC_LOCK (self); - PAD_LOCK (aggpad); - - if (aggpad->priv->flow_return != GST_FLOW_OK) { - SRC_UNLOCK (self); - goto flushing; - } - - g_queue_push_head (&aggpad->priv->data, query); - SRC_BROADCAST (self); - SRC_UNLOCK (self); - - while (!gst_aggregator_pad_queue_is_empty (aggpad) - && aggpad->priv->flow_return == GST_FLOW_OK) { - GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); - PAD_WAIT_EVENT (aggpad); - } - - s = gst_query_writable_structure (query); - if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret)) - gst_structure_remove_field (s, "gst-aggregator-retval"); - else - g_queue_remove (&aggpad->priv->data, query); - - if (aggpad->priv->flow_return != GST_FLOW_OK) - goto flushing; - - PAD_UNLOCK (aggpad); - - return ret; - } else { - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - - return klass->sink_query (self, aggpad, query); - } - -flushing: - GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query", - gst_flow_get_name (aggpad->priv->flow_return)); - PAD_UNLOCK (aggpad); - - return FALSE; -} - -/* Queue serialized events and let the others go through directly. - * The queued events with be handled from the src-pad task in - * gst_aggregator_do_events_and_queries(). - */ -static GstFlowReturn -gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, - GstEvent * event) -{ - GstFlowReturn ret = GST_FLOW_OK; - GstAggregator *self = GST_AGGREGATOR (parent); - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - - if (GST_EVENT_IS_SERIALIZED (event) - && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { - SRC_LOCK (self); - PAD_LOCK (aggpad); - - if (aggpad->priv->flow_return != GST_FLOW_OK) - goto flushing; - - if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { - GST_OBJECT_LOCK (aggpad); - gst_event_copy_segment (event, &aggpad->priv->head_segment); - aggpad->priv->head_position = aggpad->priv->head_segment.position; - update_time_level (aggpad, TRUE); - GST_OBJECT_UNLOCK (aggpad); - } - - GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event); - g_queue_push_head (&aggpad->priv->data, event); - SRC_BROADCAST (self); - PAD_UNLOCK (aggpad); - SRC_UNLOCK (self); - } else { - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - - if (!klass->sink_event (self, aggpad, event)) { - /* Copied from GstPad to convert boolean to a GstFlowReturn in - * the event handling func */ - ret = GST_FLOW_ERROR; - } - } - - return ret; - -flushing: - GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event", - gst_flow_get_name (aggpad->priv->flow_return)); - PAD_UNLOCK (aggpad); - SRC_UNLOCK (self); - if (GST_EVENT_IS_STICKY (event)) - gst_pad_store_sticky_event (pad, event); - gst_event_unref (event); - - return aggpad->priv->flow_return; -} - -static gboolean -gst_aggregator_pad_activate_mode_func (GstPad * pad, - GstObject * parent, GstPadMode mode, gboolean active) -{ - GstAggregator *self = GST_AGGREGATOR (parent); - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - - if (active == FALSE) { - SRC_LOCK (self); - gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE); - SRC_BROADCAST (self); - SRC_UNLOCK (self); - } else { - PAD_LOCK (aggpad); - aggpad->priv->flow_return = GST_FLOW_OK; - PAD_BROADCAST_EVENT (aggpad); - PAD_UNLOCK (aggpad); - } - - return TRUE; -} - -/*********************************** - * GstAggregatorPad implementation * - ************************************/ -G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD); - -static void -gst_aggregator_pad_constructed (GObject * object) -{ - GstPad *pad = GST_PAD (object); - - gst_pad_set_chain_function (pad, - GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain)); - gst_pad_set_event_full_function_full (pad, - GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL); - gst_pad_set_query_function (pad, - GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func)); - gst_pad_set_activatemode_function (pad, - GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func)); -} - -static void -gst_aggregator_pad_finalize (GObject * object) -{ - GstAggregatorPad *pad = (GstAggregatorPad *) object; - - g_cond_clear (&pad->priv->event_cond); - g_mutex_clear (&pad->priv->flush_lock); - g_mutex_clear (&pad->priv->lock); - - G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object); -} - -static void -gst_aggregator_pad_dispose (GObject * object) -{ - GstAggregatorPad *pad = (GstAggregatorPad *) object; - - gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE); - - G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object); -} - -static void -gst_aggregator_pad_class_init (GstAggregatorPadClass * klass) -{ - GObjectClass *gobject_class = (GObjectClass *) klass; - - g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate)); - - gobject_class->constructed = gst_aggregator_pad_constructed; - gobject_class->finalize = gst_aggregator_pad_finalize; - gobject_class->dispose = gst_aggregator_pad_dispose; -} - -static void -gst_aggregator_pad_init (GstAggregatorPad * pad) -{ - pad->priv = - G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD, - GstAggregatorPadPrivate); - - g_queue_init (&pad->priv->data); - g_cond_init (&pad->priv->event_cond); - - g_mutex_init (&pad->priv->flush_lock); - g_mutex_init (&pad->priv->lock); - - gst_aggregator_pad_reset_unlocked (pad); - pad->priv->negotiated = FALSE; -} - -/* Must be called with the PAD_LOCK held */ -static void -gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad) -{ - pad->priv->num_buffers--; - GST_TRACE_OBJECT (pad, "Consuming buffer"); - PAD_BROADCAST_EVENT (pad); -} - -/* Must be called with the PAD_LOCK held */ -static void -gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) -{ - GstAggregator *self = NULL; - GstAggregatorClass *aggclass = NULL; - GstBuffer *buffer = NULL; - - while (pad->priv->clipped_buffer == NULL && - GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { - buffer = g_queue_pop_tail (&pad->priv->data); - - apply_buffer (pad, buffer, FALSE); - - /* We only take the parent here so that it's not taken if the buffer is - * already clipped or if the queue is empty. - */ - if (self == NULL) { - self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad))); - if (self == NULL) { - gst_buffer_unref (buffer); - return; - } - - aggclass = GST_AGGREGATOR_GET_CLASS (self); - } - - if (aggclass->clip) { - GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer); - - buffer = aggclass->clip (self, pad, buffer); - - if (buffer == NULL) { - gst_aggregator_pad_buffer_consumed (pad); - GST_TRACE_OBJECT (pad, "Clipping consumed the buffer"); - } - } - - pad->priv->clipped_buffer = buffer; - } - - if (self) - gst_object_unref (self); -} - -/** - * gst_aggregator_pad_steal_buffer: - * @pad: the pad to get buffer from - * - * Steal the ref to the buffer currently queued in @pad. - * - * Returns: (transfer full): The buffer in @pad or NULL if no buffer was - * queued. You should unref the buffer after usage. - */ -GstBuffer * -gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) -{ - GstBuffer *buffer; - - PAD_LOCK (pad); - - gst_aggregator_pad_clip_buffer_unlocked (pad); - - buffer = pad->priv->clipped_buffer; - - if (buffer) { - pad->priv->clipped_buffer = NULL; - gst_aggregator_pad_buffer_consumed (pad); - GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); - } - - PAD_UNLOCK (pad); - - return buffer; -} - -/** - * gst_aggregator_pad_drop_buffer: - * @pad: the pad where to drop any pending buffer - * - * Drop the buffer currently queued in @pad. - * - * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not. - */ -gboolean -gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) -{ - GstBuffer *buf; - - buf = gst_aggregator_pad_steal_buffer (pad); - - if (buf == NULL) - return FALSE; - - gst_buffer_unref (buf); - return TRUE; -} - -/** - * gst_aggregator_pad_get_buffer: - * @pad: the pad to get buffer from - * - * Returns: (transfer full): A reference to the buffer in @pad or - * NULL if no buffer was queued. You should unref the buffer after - * usage. - */ -GstBuffer * -gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) -{ - GstBuffer *buffer; - - PAD_LOCK (pad); - - gst_aggregator_pad_clip_buffer_unlocked (pad); - - if (pad->priv->clipped_buffer) { - buffer = gst_buffer_ref (pad->priv->clipped_buffer); - } else { - buffer = NULL; - } - PAD_UNLOCK (pad); - - return buffer; -} - -/** - * gst_aggregator_pad_is_eos: - * @pad: an aggregator pad - * - * Returns: %TRUE if the pad is EOS, otherwise %FALSE. - */ -gboolean -gst_aggregator_pad_is_eos (GstAggregatorPad * pad) -{ - gboolean is_eos; - - PAD_LOCK (pad); - is_eos = pad->priv->eos; - PAD_UNLOCK (pad); - - return is_eos; -} - -/** - * gst_aggregator_merge_tags: - * @self: a #GstAggregator - * @tags: a #GstTagList to merge - * @mode: the #GstTagMergeMode to use - * - * Adds tags to so-called pending tags, which will be processed - * before pushing out data downstream. - * - * Note that this is provided for convenience, and the subclass is - * not required to use this and can still do tag handling on its own. - * - * MT safe. - */ -void -gst_aggregator_merge_tags (GstAggregator * self, - const GstTagList * tags, GstTagMergeMode mode) -{ - GstTagList *otags; - - g_return_if_fail (GST_IS_AGGREGATOR (self)); - g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags)); - - /* FIXME Check if we can use OBJECT lock here! */ - GST_OBJECT_LOCK (self); - if (tags) - GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags); - otags = self->priv->tags; - self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode); - if (otags) - gst_tag_list_unref (otags); - self->priv->tags_changed = TRUE; - GST_OBJECT_UNLOCK (self); -} - -/** - * gst_aggregator_set_latency: - * @self: a #GstAggregator - * @min_latency: minimum latency - * @max_latency: maximum latency - * - * Lets #GstAggregator sub-classes tell the baseclass what their internal - * latency is. Will also post a LATENCY message on the bus so the pipeline - * can reconfigure its global latency. - */ -void -gst_aggregator_set_latency (GstAggregator * self, - GstClockTime min_latency, GstClockTime max_latency) -{ - gboolean changed = FALSE; - - g_return_if_fail (GST_IS_AGGREGATOR (self)); - g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency)); - g_return_if_fail (max_latency >= min_latency); - - SRC_LOCK (self); - if (self->priv->sub_latency_min != min_latency) { - self->priv->sub_latency_min = min_latency; - changed = TRUE; - } - if (self->priv->sub_latency_max != max_latency) { - self->priv->sub_latency_max = max_latency; - changed = TRUE; - } - - if (changed) - SRC_BROADCAST (self); - SRC_UNLOCK (self); - - if (changed) { - gst_element_post_message (GST_ELEMENT_CAST (self), - gst_message_new_latency (GST_OBJECT_CAST (self))); - } -} - -/** - * gst_aggregator_get_buffer_pool: - * @self: a #GstAggregator - * - * Returns: (transfer full): the instance of the #GstBufferPool used - * by @trans; free it after use it - */ -GstBufferPool * -gst_aggregator_get_buffer_pool (GstAggregator * self) -{ - GstBufferPool *pool; - - g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL); - - GST_OBJECT_LOCK (self); - pool = self->priv->pool; - if (pool) - gst_object_ref (pool); - GST_OBJECT_UNLOCK (self); - - return pool; -} - -/** - * gst_aggregator_get_allocator: - * @self: a #GstAggregator - * @allocator: (out) (allow-none) (transfer full): the #GstAllocator - * used - * @params: (out) (allow-none) (transfer full): the - * #GstAllocationParams of @allocator - * - * Lets #GstAggregator sub-classes get the memory @allocator - * acquired by the base class and its @params. - * - * Unref the @allocator after use it. - */ -void -gst_aggregator_get_allocator (GstAggregator * self, - GstAllocator ** allocator, GstAllocationParams * params) -{ - g_return_if_fail (GST_IS_AGGREGATOR (self)); - - if (allocator) - *allocator = self->priv->allocator ? - gst_object_ref (self->priv->allocator) : NULL; - - if (params) - *params = self->priv->allocation_params; -} diff --git a/gst-libs/gst/base/gstaggregator.h b/gst-libs/gst/base/gstaggregator.h deleted file mode 100644 index fc2d6aeae..000000000 --- a/gst-libs/gst/base/gstaggregator.h +++ /dev/null @@ -1,336 +0,0 @@ -/* GStreamer aggregator base class - * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@oencreed.com> - * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ - -#ifndef __GST_AGGREGATOR_H__ -#define __GST_AGGREGATOR_H__ - -#ifndef GST_USE_UNSTABLE_API -#warning "The Base library from gst-plugins-bad is unstable API and may change in future." -#warning "You can define GST_USE_UNSTABLE_API to avoid this warning." -#endif - -#include <gst/gst.h> - -G_BEGIN_DECLS - -/************************** - * GstAggregator Structs * - *************************/ - -typedef struct _GstAggregator GstAggregator; -typedef struct _GstAggregatorPrivate GstAggregatorPrivate; -typedef struct _GstAggregatorClass GstAggregatorClass; - -/************************ - * GstAggregatorPad API * - ***********************/ - -#define GST_TYPE_AGGREGATOR_PAD (gst_aggregator_pad_get_type()) -#define GST_AGGREGATOR_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_AGGREGATOR_PAD, GstAggregatorPad)) -#define GST_AGGREGATOR_PAD_CAST(obj) ((GstAggregatorPad *)(obj)) -#define GST_AGGREGATOR_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_AGGREGATOR_PAD, GstAggregatorPadClass)) -#define GST_AGGREGATOR_PAD_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj),GST_TYPE_AGGREGATOR_PAD, GstAggregatorPadClass)) -#define GST_IS_AGGREGATOR_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_AGGREGATOR_PAD)) -#define GST_IS_AGGREGATOR_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_AGGREGATOR_PAD)) - -/**************************** - * GstAggregatorPad Structs * - ***************************/ - -typedef struct _GstAggregatorPad GstAggregatorPad; -typedef struct _GstAggregatorPadClass GstAggregatorPadClass; -typedef struct _GstAggregatorPadPrivate GstAggregatorPadPrivate; - -/** - * GstAggregatorPad: - * @segment: last segment received. - * - * The implementation the GstPad to use with #GstAggregator - */ -struct _GstAggregatorPad -{ - GstPad parent; - - /* Protected by the OBJECT_LOCK */ - GstSegment segment; - - /* < Private > */ - GstAggregatorPadPrivate * priv; - - gpointer _gst_reserved[GST_PADDING]; -}; - -/** - * GstAggregatorPadClass: - * @flush: Optional - * Called when the pad has received a flush stop, this is the place - * to flush any information specific to the pad, it allows for individual - * pads to be flushed while others might not be. - * - */ -struct _GstAggregatorPadClass -{ - GstPadClass parent_class; - - GstFlowReturn (*flush) (GstAggregatorPad * aggpad, GstAggregator * aggregator); - - /*< private >*/ - gpointer _gst_reserved[GST_PADDING_LARGE]; -}; - -GST_EXPORT -GType gst_aggregator_pad_get_type (void); - -/**************************** - * GstAggregatorPad methods * - ***************************/ - -GST_EXPORT -GstBuffer * gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad); - -GST_EXPORT -GstBuffer * gst_aggregator_pad_get_buffer (GstAggregatorPad * pad); - -GST_EXPORT -gboolean gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad); - -GST_EXPORT -gboolean gst_aggregator_pad_is_eos (GstAggregatorPad * pad); - -/********************* - * GstAggregator API * - ********************/ - -#define GST_TYPE_AGGREGATOR (gst_aggregator_get_type()) -#define GST_AGGREGATOR(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_AGGREGATOR,GstAggregator)) -#define GST_AGGREGATOR_CAST(obj) ((GstAggregator *)(obj)) -#define GST_AGGREGATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_AGGREGATOR,GstAggregatorClass)) -#define GST_AGGREGATOR_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj),GST_TYPE_AGGREGATOR,GstAggregatorClass)) -#define GST_IS_AGGREGATOR(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_AGGREGATOR)) -#define GST_IS_AGGREGATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_AGGREGATOR)) - -#define GST_AGGREGATOR_FLOW_NEED_DATA GST_FLOW_CUSTOM_ERROR - -/** - * GstAggregator: - * @srcpad: the aggregator's source pad - * @segment: the output segment - * - * Aggregator base class object structure. - */ -struct _GstAggregator -{ - GstElement parent; - - GstPad * srcpad; - - /* Only access with the object lock held */ - GstSegment segment; - - /*< private >*/ - GstAggregatorPrivate * priv; - - gpointer _gst_reserved[GST_PADDING_LARGE]; -}; - -/** - * GstAggregatorClass: - * @flush: Optional. - * Called after a successful flushing seek, once all the flush - * stops have been received. Flush pad-specific data in - * #GstAggregatorPad->flush. - * @clip: Optional. - * Called when a buffer is received on a sink pad, the task of - * clipping it and translating it to the current segment falls - * on the subclass. The function should use the segment of data - * and the negotiated media type on the pad to perform - * clipping of input buffer. This function takes ownership of - * buf and should output a buffer or return NULL in - * if the buffer should be dropped. - * @sink_event: Optional. - * Called when an event is received on a sink pad, the subclass - * should always chain up. - * @sink_query: Optional. - * Called when a query is received on a sink pad, the subclass - * should always chain up. - * @src_event: Optional. - * Called when an event is received on the src pad, the subclass - * should always chain up. - * @src_query: Optional. - * Called when a query is received on the src pad, the subclass - * should always chain up. - * @src_activate: Optional. - * Called when the src pad is activated, it will start/stop its - * pad task right after that call. - * @aggregate: Mandatory. - * Called when buffers are queued on all sinkpads. Classes - * should iterate the GstElement->sinkpads and peek or steal - * buffers from the #GstAggregatorPads. If the subclass returns - * GST_FLOW_EOS, sending of the eos event will be taken care - * of. Once / if a buffer has been constructed from the - * aggregated buffers, the subclass should call _finish_buffer. - * @stop: Optional. - * Called when the element goes from PAUSED to READY. - * The subclass should free all resources and reset its state. - * @start: Optional. - * Called when the element goes from READY to PAUSED. - * The subclass should get ready to process - * aggregated buffers. - * @get_next_time: Optional. - * Called when the element needs to know the running time of the next - * rendered buffer for live pipelines. This causes deadline - * based aggregation to occur. Defaults to returning - * GST_CLOCK_TIME_NONE causing the element to wait for buffers - * on all sink pads before aggregating. - * @update_src_caps: Lets subclasses update the #GstCaps representing - * the src pad caps before usage. The result should end up - * in @ret. Return %GST_AGGREGATOR_FLOW_NEED_DATA to indicate that the - * element needs more information (caps, a buffer, etc) to - * choose the correct caps. Should return ANY caps if the - * stream has not caps at all. - * @fixate_src_caps: Optional. - * Fixate and return the src pad caps provided. The function takes - * ownership of @caps and returns a fixated version of - * @caps. @caps is not guaranteed to be writable. - * @negotiated_src_caps: Optional. - * Notifies subclasses what caps format has been negotiated - * @decide_allocation: Optional. - * Allows the subclass to influence the allocation choices. - * Setup the allocation parameters for allocating output - * buffers. The passed in query contains the result of the - * downstream allocation query. - * - * The aggregator base class will handle in a thread-safe way all manners of - * concurrent flushes, seeks, pad additions and removals, leaving to the - * subclass the responsibility of clipping buffers, and aggregating buffers in - * the way the implementor sees fit. - * - * It will also take care of event ordering (stream-start, segment, eos). - * - * Basically, a simple implementation will override @aggregate, and call - * _finish_buffer from inside that function. - */ -struct _GstAggregatorClass { - GstElementClass parent_class; - - GstFlowReturn (*flush) (GstAggregator * aggregator); - - GstBuffer * (*clip) (GstAggregator * aggregator, - GstAggregatorPad * aggregator_pad, - GstBuffer * buf); - - /* sinkpads virtual methods */ - gboolean (*sink_event) (GstAggregator * aggregator, - GstAggregatorPad * aggregator_pad, - GstEvent * event); - - gboolean (*sink_query) (GstAggregator * aggregator, - GstAggregatorPad * aggregator_pad, - GstQuery * query); - - /* srcpad virtual methods */ - gboolean (*src_event) (GstAggregator * aggregator, - GstEvent * event); - - gboolean (*src_query) (GstAggregator * aggregator, - GstQuery * query); - - gboolean (*src_activate) (GstAggregator * aggregator, - GstPadMode mode, - gboolean active); - - GstFlowReturn (*aggregate) (GstAggregator * aggregator, - gboolean timeout); - - gboolean (*stop) (GstAggregator * aggregator); - - gboolean (*start) (GstAggregator * aggregator); - - GstClockTime (*get_next_time) (GstAggregator * aggregator); - - GstAggregatorPad * (*create_new_pad) (GstAggregator * self, - GstPadTemplate * templ, - const gchar * req_name, - const GstCaps * caps); - GstFlowReturn (*update_src_caps) (GstAggregator * self, - GstCaps * caps, - GstCaps ** ret); - GstCaps * (*fixate_src_caps) (GstAggregator * self, - GstCaps * caps); - gboolean (*negotiated_src_caps) (GstAggregator * self, - GstCaps * caps); - gboolean (*decide_allocation) (GstAggregator * self, - GstQuery * query); - gboolean (*propose_allocation) (GstAggregator * self, - GstAggregatorPad * pad, - GstQuery * decide_query, - GstQuery * query); - /*< private >*/ - gpointer _gst_reserved[GST_PADDING_LARGE]; -}; - -/************************************ - * GstAggregator convenience macros * - ***********************************/ - -/** - * GST_AGGREGATOR_SRC_PAD: - * @agg: a #GstAggregator - * - * Convenience macro to access the source pad of #GstAggregator - * - * Since: 1.6 - */ -#define GST_AGGREGATOR_SRC_PAD(agg) (((GstAggregator *)(agg))->srcpad) - -/************************* - * GstAggregator methods * - ************************/ - -GST_EXPORT -GstFlowReturn gst_aggregator_finish_buffer (GstAggregator * self, - GstBuffer * buffer); - -GST_EXPORT -void gst_aggregator_set_src_caps (GstAggregator * self, - GstCaps * caps); - -GST_EXPORT -void gst_aggregator_set_latency (GstAggregator * self, - GstClockTime min_latency, - GstClockTime max_latency); - -GST_EXPORT -GType gst_aggregator_get_type(void); - -GST_EXPORT -GstClockTime gst_aggregator_get_latency (GstAggregator * self); - -GST_EXPORT -GstBufferPool * gst_aggregator_get_buffer_pool (GstAggregator * self); - -GST_EXPORT -void gst_aggregator_get_allocator (GstAggregator * self, - GstAllocator ** allocator, - GstAllocationParams * params); - -G_END_DECLS - -#endif /* __GST_AGGREGATOR_H__ */ diff --git a/gst-libs/gst/base/meson.build b/gst-libs/gst/base/meson.build deleted file mode 100644 index 7cffba075..000000000 --- a/gst-libs/gst/base/meson.build +++ /dev/null @@ -1,17 +0,0 @@ -badbase_sources = ['gstaggregator.c'] -badbase_headers = ['gstaggregator.h'] -install_headers(badbase_headers, subdir : 'gstreamer-1.0/gst/base') - -gstbadbase = library('gstbadbase-' + api_version, - badbase_sources, - c_args : gst_plugins_bad_args + ['-DGST_USE_UNSTABLE_API'], - include_directories : [configinc, libsinc], - version : libversion, - soversion : soversion, - install : true, - dependencies : [gst_dep], -) - -gstbadbase_dep = declare_dependency(link_with : gstbadbase, - include_directories : [libsinc], - dependencies : [gst_dep]) diff --git a/gst-libs/gst/meson.build b/gst-libs/gst/meson.build index a4d7dec72..63267eaa9 100644 --- a/gst-libs/gst/meson.build +++ b/gst-libs/gst/meson.build @@ -1,4 +1,3 @@ -subdir('base') subdir('uridownloader') subdir('allocators') diff --git a/gst-libs/gst/video/Makefile.am b/gst-libs/gst/video/Makefile.am index 6cdad6f53..e51f6bb52 100644 --- a/gst-libs/gst/video/Makefile.am +++ b/gst-libs/gst/video/Makefile.am @@ -17,7 +17,6 @@ libgstbadvideo_@GST_API_VERSION@_la_CFLAGS = \ $(GST_BASE_CFLAGS) libgstbadvideo_@GST_API_VERSION@_la_LIBADD = \ - $(top_builddir)/gst-libs/gst/base/libgstbadbase-$(GST_API_VERSION).la \ $(GST_PLUGINS_BASE_LIBS) -lgstvideo-$(GST_API_VERSION) \ $(GST_BASE_LIBS) $(GST_LIBS) $(ORC_LIBS) $(LIBM) diff --git a/gst-libs/gst/video/meson.build b/gst-libs/gst/video/meson.build index 9c38628d3..3bec5988a 100644 --- a/gst-libs/gst/video/meson.build +++ b/gst-libs/gst/video/meson.build @@ -15,9 +15,9 @@ gstbadvideo = library('gstbadvideo-' + api_version, version : libversion, soversion : soversion, install : true, - dependencies : [gstvideo_dep, gstbadbase_dep], + dependencies : [gstvideo_dep, gstbase_dep], ) gstbadvideo_dep = declare_dependency(link_with : gstbadvideo, include_directories : [libsinc], - dependencies : [gstvideo_dep, gstbadbase_dep]) + dependencies : [gstvideo_dep, gstbase_dep]) |