diff options
author | Thiago Santos <ts.santos@sisa.samsung.com> | 2014-07-01 10:07:40 -0300 |
---|---|---|
committer | Thiago Santos <ts.santos@osg.samsung.com> | 2014-07-22 08:51:32 -0300 |
commit | ea7b843244f3494a96b43d0a5dcaed69b8a5bf56 (patch) | |
tree | bfe7ed599a426197eafc758979dd4ed31689bc02 /gst/gdp | |
parent | bd4ae189388386baa34bdcd1dfe6f49c8d568649 (diff) | |
download | gstreamer-plugins-bad-ea7b843244f3494a96b43d0a5dcaed69b8a5bf56.tar.gz |
gdppay: put all sticky events in streamheader
Use the sticky events to compose the streamheader as they are the
ones that are persisted to config new pads linked. Instead of storing
them ourselves rely on the pad storage that already orders it for us
https://bugzilla.gnome.org/show_bug.cgi?id=732596
Diffstat (limited to 'gst/gdp')
-rw-r--r-- | gst/gdp/gstgdppay.c | 246 | ||||
-rw-r--r-- | gst/gdp/gstgdppay.h | 9 |
2 files changed, 101 insertions, 154 deletions
diff --git a/gst/gdp/gstgdppay.c b/gst/gdp/gstgdppay.c index d8bc818dc..708e13790 100644 --- a/gst/gdp/gstgdppay.c +++ b/gst/gdp/gstgdppay.c @@ -188,23 +188,11 @@ gst_gdp_pay_reset (GstGDPPay * this) gst_caps_unref (this->caps); this->caps = NULL; } - if (this->caps_buf) { - gst_buffer_unref (this->caps_buf); - this->caps_buf = NULL; - } - if (this->tag_buf) { - gst_buffer_unref (this->tag_buf); - this->tag_buf = NULL; - } - if (this->new_segment_buf) { - gst_buffer_unref (this->new_segment_buf); - this->new_segment_buf = NULL; - } - if (this->streamstartid_buf) { - gst_buffer_unref (this->streamstartid_buf); - this->streamstartid_buf = NULL; - } + this->have_caps = FALSE; + this->have_segment = FALSE; + this->have_streamstartid = FALSE; this->sent_streamheader = FALSE; + this->reset_streamheader = FALSE; this->offset = 0; } @@ -308,6 +296,48 @@ no_event: } } +static void +gdp_streamheader_array_append_buffer (GValue * array, GstBuffer * buf) +{ + GValue value = { 0, }; + + g_value_init (&value, GST_TYPE_BUFFER); + gst_value_set_buffer (&value, buf); + gst_value_array_append_value (array, &value); + g_value_unset (&value); +} + +typedef struct +{ + GstGDPPay *gdppay; + GValue *array; +} GstGDPPayAndArray; + +static gboolean +gdp_streamheader_array_store_events (GstPad * pad, GstEvent ** event, + gpointer udata) +{ + GstGDPPayAndArray *gdp_and_array = udata; + GstGDPPay *this = gdp_and_array->gdppay; + GValue *array = gdp_and_array->array; + GstBuffer *buf; + + /* Need to handle caps differently to keep compatibility with 1.0 */ + if (GST_EVENT_TYPE (*event) == GST_EVENT_CAPS) { + GstCaps *caps; + + gst_event_parse_caps (*event, &caps); + buf = gst_gdp_buffer_from_caps (this, caps); + } else { + buf = gst_gdp_buffer_from_event (this, *event); + } + + GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_HEADER); + gst_gdp_stamp_buffer (this, buf); + gdp_streamheader_array_append_buffer (array, buf); + + return TRUE; +} /* set our caps with streamheader, based on the latest newsegment and caps, * and (possibly) GDP-serialized buffers of the streamheaders on the src pad */ @@ -315,14 +345,15 @@ static GstFlowReturn gst_gdp_pay_reset_streamheader (GstGDPPay * this) { GstCaps *caps; - /* We use copies of these to avoid circular refcounts */ - GstBuffer *new_segment_buf, *caps_buf, *tag_buf, *streamstartid_buf; GstStructure *structure; GstFlowReturn r = GST_FLOW_OK; gboolean version_one_zero = TRUE; + GstGDPPayAndArray gdp_and_array; GValue array = { 0 }; - GValue value = { 0 }; + + gdp_and_array.gdppay = this; + gdp_and_array.array = &array; GST_DEBUG_OBJECT (this, "start"); /* In version 0.2, we didn't need or send new segment or tags */ @@ -330,13 +361,13 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) version_one_zero = FALSE; if (version_one_zero) { - if (!this->new_segment_buf || !this->caps_buf || !this->streamstartid_buf) { + if (!this->have_segment || !this->have_caps || !this->have_streamstartid) { GST_DEBUG_OBJECT (this, "1.0, missing new_segment or caps or stream " "start id, returning"); return GST_FLOW_OK; } } else { - if (!this->caps_buf) { + if (!this->have_caps) { GST_DEBUG_OBJECT (this, "0.2, missing caps, returning"); return GST_FLOW_OK; } @@ -346,51 +377,24 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) * Stamp the buffers with offset and offset_end as well. * We do this here so the offsets match the order the buffers go out in */ g_value_init (&array, GST_TYPE_ARRAY); - if (version_one_zero) { - gst_gdp_stamp_buffer (this, this->streamstartid_buf); - GST_DEBUG_OBJECT (this, "appending copy of stream start id buffer %p", - this->streamstartid_buf); - streamstartid_buf = gst_buffer_copy (this->streamstartid_buf); - g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, streamstartid_buf); - gst_value_array_append_value (&array, &value); - g_value_unset (&value); - gst_buffer_unref (streamstartid_buf); - } + gst_pad_sticky_events_foreach (this->sinkpad, + gdp_streamheader_array_store_events, &gdp_and_array); + } else { + GstEvent *capsevent = + gst_pad_get_sticky_event (this->sinkpad, GST_EVENT_CAPS, 0); + if (capsevent) { + GstCaps *caps; + GstBuffer *capsbuffer = NULL; - gst_gdp_stamp_buffer (this, this->caps_buf); - GST_DEBUG_OBJECT (this, "appending copy of caps buffer %p", this->caps_buf); - caps_buf = gst_buffer_copy (this->caps_buf); - g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, caps_buf); - gst_value_array_append_value (&array, &value); - g_value_unset (&value); - gst_buffer_unref (caps_buf); + gst_event_parse_caps (capsevent, &caps); + capsbuffer = gst_gdp_buffer_from_caps (this, caps); - if (version_one_zero) { - gst_gdp_stamp_buffer (this, this->new_segment_buf); - GST_DEBUG_OBJECT (this, "1.0, appending copy of new segment buffer %p", - this->new_segment_buf); - new_segment_buf = gst_buffer_copy (this->new_segment_buf); - g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, new_segment_buf); - gst_value_array_append_value (&array, &value); - g_value_unset (&value); - gst_buffer_unref (new_segment_buf); - - if (this->tag_buf) { - gst_gdp_stamp_buffer (this, this->tag_buf); - GST_DEBUG_OBJECT (this, "1.0, appending current tags buffer %p", - this->tag_buf); - tag_buf = this->tag_buf; - this->tag_buf = NULL; - - g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, tag_buf); - gst_value_array_append_value (&array, &value); - g_value_unset (&value); - gst_buffer_unref (tag_buf); + gst_gdp_stamp_buffer (this, capsbuffer); + gdp_streamheader_array_append_buffer (&array, capsbuffer); + + gst_buffer_unref (capsbuffer); + gst_event_unref (capsevent); } } @@ -399,11 +403,8 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) structure = gst_caps_get_structure (this->caps, 0); if (gst_structure_has_field (structure, "streamheader")) { const GValue *sh; - GArray *buffers; - GstBuffer *buffer; - int i; sh = gst_structure_get_value (structure, "streamheader"); @@ -440,10 +441,7 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) GST_BUFFER_OFFSET_END (outbuffer) = GST_BUFFER_OFFSET_NONE; GST_BUFFER_TIMESTAMP (outbuffer) = GST_CLOCK_TIME_NONE; - g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, outbuffer); - gst_value_array_append_value (&array, &value); - g_value_unset (&value); + gdp_streamheader_array_append_buffer (&array, outbuffer); gst_buffer_unref (outbuffer); } @@ -478,40 +476,6 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) } } - /* push out these streamheader buffers, then flush our internal queue */ - GST_DEBUG_OBJECT (this, "Pushing GDP stream-start-id buffer %p", - this->streamstartid_buf); - r = gst_pad_push (this->srcpad, gst_buffer_ref (this->streamstartid_buf)); - if (r != GST_FLOW_OK) { - GST_WARNING_OBJECT (this, "pushing GDP stream-start-id buffer returned %d", - r); - goto done; - } - GST_DEBUG_OBJECT (this, "Pushing GDP caps buffer %p", this->caps_buf); - r = gst_pad_push (this->srcpad, gst_buffer_ref (this->caps_buf)); - if (r != GST_FLOW_OK) { - GST_WARNING_OBJECT (this, "pushing GDP caps buffer returned %d", r); - goto done; - } - GST_DEBUG_OBJECT (this, "Pushing GDP new_segment buffer %p with offset %" - G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT, this->new_segment_buf, - GST_BUFFER_OFFSET (this->new_segment_buf), - GST_BUFFER_OFFSET_END (this->new_segment_buf)); - /* we stored these bufs with refcount 1, so make sure we keep a ref */ - r = gst_pad_push (this->srcpad, gst_buffer_ref (this->new_segment_buf)); - if (r != GST_FLOW_OK) { - GST_WARNING_OBJECT (this, "pushing GDP newsegment buffer returned %d", r); - goto done; - } - if (this->tag_buf) { - GST_DEBUG_OBJECT (this, "Pushing GDP tag buffer %p", this->tag_buf); - /* we stored these bufs with refcount 1, so make sure we keep a ref */ - r = gst_pad_push (this->srcpad, gst_buffer_ref (this->tag_buf)); - if (r != GST_FLOW_OK) { - GST_WARNING_OBJECT (this, "pushing GDP tag buffer returned %d", r); - goto done; - } - } this->sent_streamheader = TRUE; GST_DEBUG_OBJECT (this, "need to push %d queued buffers", g_list_length (this->queue)); @@ -524,7 +488,6 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) /* delete buffer from queue now */ this->queue = g_list_delete_link (this->queue, this->queue); - /* set caps and push */ r = gst_pad_push (this->srcpad, buffer); if (r != GST_FLOW_OK) { GST_WARNING_OBJECT (this, "pushing queued GDP buffer returned %d", r); @@ -532,6 +495,8 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) } } + this->reset_streamheader = FALSE; + done: gst_caps_unref (caps); GST_DEBUG_OBJECT (this, "stop"); @@ -551,7 +516,7 @@ no_buffer: static GstFlowReturn gst_gdp_queue_buffer (GstGDPPay * this, GstBuffer * buffer) { - if (this->sent_streamheader) { + if (this->sent_streamheader && !this->reset_streamheader) { GST_LOG_OBJECT (this, "Pushing GDP buffer %p, caps %" GST_PTR_FORMAT, buffer, this->caps); return gst_pad_push (this->srcpad, buffer); @@ -559,12 +524,10 @@ gst_gdp_queue_buffer (GstGDPPay * this, GstBuffer * buffer) /* store it on an internal queue. buffer remains reffed. */ this->queue = g_list_append (this->queue, buffer); - GST_DEBUG_OBJECT (this, "streamheader not sent yet, " + GST_DEBUG_OBJECT (this, "streamheader not sent yet or needs update, " "queued buffer %p, now %d buffers queued", buffer, g_list_length (this->queue)); - gst_gdp_pay_reset_streamheader (this); - return GST_FLOW_OK; } @@ -582,7 +545,7 @@ gst_gdp_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) /* we should have received a new_segment before, otherwise it's a bug. * fake one in that case */ - if (!this->new_segment_buf) { + if (!this->have_segment) { GstEvent *event; GstSegment segment; @@ -603,7 +566,7 @@ gst_gdp_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); GST_DEBUG_OBJECT (this, "Storing buffer %p as new_segment_buf", outbuffer); - this->new_segment_buf = outbuffer; + this->have_segment = TRUE; } } /* make sure we've received caps before */ @@ -629,6 +592,9 @@ gst_gdp_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer); GST_BUFFER_DURATION (outbuffer) = GST_BUFFER_DURATION (buffer); + if (this->reset_streamheader) + gst_gdp_pay_reset_streamheader (this); + ret = gst_gdp_queue_buffer (this, outbuffer); done: @@ -689,28 +655,16 @@ gst_gdp_pay_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) * and not send it on */ switch (GST_EVENT_TYPE (event)) { case GST_EVENT_STREAM_START: - GST_DEBUG_OBJECT (this, "Storing stream start id in buffer %p", - outbuffer); - - if (this->streamstartid_buf) - gst_buffer_unref (this->streamstartid_buf); - this->streamstartid_buf = outbuffer; - - GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); - gst_gdp_pay_reset_streamheader (this); + GST_DEBUG_OBJECT (this, "Received stream start id"); + this->have_streamstartid = TRUE; break; case GST_EVENT_SEGMENT: - GST_DEBUG_OBJECT (this, "Storing in caps buffer %p as new_segment_buf", - outbuffer); - - if (this->new_segment_buf) - gst_buffer_unref (this->new_segment_buf); - this->new_segment_buf = outbuffer; - - GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); - gst_gdp_pay_reset_streamheader (this); + GST_DEBUG_OBJECT (this, "Received segment %" GST_PTR_FORMAT, event); + this->have_segment = TRUE; break; case GST_EVENT_CAPS:{ + GST_DEBUG_OBJECT (this, "Received caps %" GST_PTR_FORMAT, event); + this->have_caps = TRUE; gst_event_parse_caps (event, &caps); gst_buffer_replace (&outbuffer, NULL); if (this->caps == NULL || !gst_caps_is_equal (this->caps, caps)) { @@ -721,34 +675,18 @@ gst_gdp_pay_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) goto no_buffer_from_caps; GST_BUFFER_DURATION (outbuffer) = 0; - GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); - if (this->caps_buf) - gst_buffer_unref (this->caps_buf); - this->caps_buf = outbuffer; - gst_gdp_pay_reset_streamheader (this); } break; } - case GST_EVENT_TAG: - GST_DEBUG_OBJECT (this, "Storing in caps buffer %p as tag_buf", - outbuffer); - - if (this->tag_buf) - gst_buffer_unref (this->tag_buf); - this->tag_buf = outbuffer; - - GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); - gst_gdp_pay_reset_streamheader (this); - break; default: - GST_DEBUG_OBJECT (this, "queuing GDP buffer %p of event %p", outbuffer, - event); - flowret = gst_gdp_queue_buffer (this, outbuffer); - if (flowret != GST_FLOW_OK) - goto push_error; break; } + if (GST_EVENT_IS_STICKY (event)) { + GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); + this->reset_streamheader = TRUE; + } + /* if we have EOS, we should send on EOS ourselves */ if (GST_EVENT_TYPE (event) == GST_EVENT_EOS || GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) { @@ -757,6 +695,14 @@ gst_gdp_pay_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) ret = gst_pad_push_event (this->srcpad, gst_event_ref (event)); } + if (GST_EVENT_TYPE (event) != GST_EVENT_EOS) { + GST_DEBUG_OBJECT (this, "queuing GDP buffer %p of event %p", outbuffer, + event); + flowret = gst_gdp_queue_buffer (this, outbuffer); + if (flowret != GST_FLOW_OK) + goto push_error; + } + done: gst_event_unref (event); diff --git a/gst/gdp/gstgdppay.h b/gst/gdp/gstgdppay.h index d4433ef77..1dc71aef1 100644 --- a/gst/gdp/gstgdppay.h +++ b/gst/gdp/gstgdppay.h @@ -52,10 +52,11 @@ struct _GstGDPPay GstCaps *caps; /* incoming caps */ - GstBuffer *streamstartid_buf; - GstBuffer *caps_buf; - GstBuffer *new_segment_buf; - GstBuffer *tag_buf; + gboolean have_streamstartid; + gboolean have_caps; + gboolean have_segment; + + gboolean reset_streamheader; gboolean sent_streamheader; /* TRUE after the first streamheaders are sent */ GList *queue; /* list of queued buffers before streamheaders are sent */ |