diff options
author | Linus Svensson <linussn@axis.com> | 2015-11-06 09:44:16 +0100 |
---|---|---|
committer | Olivier CrĂȘte <olivier.crete@collabora.com> | 2015-11-06 12:55:25 -0500 |
commit | a58826292eab80e83b9de525753895e56410e301 (patch) | |
tree | 96e33c811d871323bf24388651d338dbcc0f47fa /gst | |
parent | fd0ca0a9723931463c8be93517d13b5ef799f6f0 (diff) | |
download | gstreamer-plugins-bad-a58826292eab80e83b9de525753895e56410e301.tar.gz |
rtponviftimestamp: Do not rearange order of data
If a buffer or a buffer list is cached, no events serialized with the
data stream should get through. The cached buffers and events should
be purged when we stop flushing.
https://bugzilla.gnome.org/show_bug.cgi?id=757688
Diffstat (limited to 'gst')
-rw-r--r-- | gst/onvif/gstrtponviftimestamp.c | 164 | ||||
-rw-r--r-- | gst/onvif/gstrtponviftimestamp.h | 2 |
2 files changed, 119 insertions, 47 deletions
diff --git a/gst/onvif/gstrtponviftimestamp.c b/gst/onvif/gstrtponviftimestamp.c index 818c8afc6..b539c83aa 100644 --- a/gst/onvif/gstrtponviftimestamp.c +++ b/gst/onvif/gstrtponviftimestamp.c @@ -42,6 +42,11 @@ static GstFlowReturn gst_rtp_onvif_timestamp_chain (GstPad * pad, static GstFlowReturn gst_rtp_onvif_timestamp_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list); +static GstFlowReturn handle_and_push_buffer (GstRtpOnvifTimestamp * self, + GstBuffer * buf, gboolean end_contiguous); +static GstFlowReturn handle_and_push_buffer_list (GstRtpOnvifTimestamp * self, + GstBufferList * list, gboolean end_contiguous); + static GstStaticPadTemplate sink_template_factory = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, @@ -112,6 +117,65 @@ gst_rtp_onvif_timestamp_set_property (GObject * object, } } +/* send cached buffer or list, and events, if present */ +static GstFlowReturn +send_cached_buffer_and_events (GstRtpOnvifTimestamp * self, + gboolean end_contiguous) +{ + GstFlowReturn ret = GST_FLOW_OK; + + g_assert (!(self->buffer && self->list)); + + if (self->buffer) { + GST_DEBUG_OBJECT (self, "pushing %" GST_PTR_FORMAT, self->buffer); + ret = handle_and_push_buffer (self, self->buffer, end_contiguous); + self->buffer = NULL; + } + if (self->list) { + GST_DEBUG_OBJECT (self, "pushing %" GST_PTR_FORMAT, self->list); + ret = handle_and_push_buffer_list (self, self->list, end_contiguous); + self->list = NULL; + } + + if (ret != GST_FLOW_OK) + goto out; + + while (!g_queue_is_empty (self->event_queue)) { + GstEvent *event; + + event = GST_EVENT_CAST (g_queue_pop_head (self->event_queue)); + GST_LOG_OBJECT (self->sinkpad, "sending %" GST_PTR_FORMAT, event); + (void) gst_pad_send_event (self->sinkpad, event); + } + +out: + return ret; +} + +static void +purge_cached_buffer_and_events (GstRtpOnvifTimestamp * self) +{ + g_assert (!(self->buffer && self->list)); + + if (self->buffer) { + GST_DEBUG_OBJECT (self, "purging %" GST_PTR_FORMAT, self->buffer); + gst_buffer_unref (self->buffer); + self->buffer = NULL; + } + if (self->list) { + GST_DEBUG_OBJECT (self, "purging %" GST_PTR_FORMAT, self->list); + gst_buffer_list_unref (self->list); + self->list = NULL; + } + + while (!g_queue_is_empty (self->event_queue)) { + GstEvent *event; + + event = GST_EVENT_CAST (g_queue_pop_head (self->event_queue)); + gst_event_unref (event); + } +} + static GstStateChangeReturn gst_rtp_onvif_timestamp_change_state (GstElement * element, GstStateChange transition) @@ -121,6 +185,7 @@ gst_rtp_onvif_timestamp_change_state (GstElement * element, switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_READY: + purge_cached_buffer_and_events (self); gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED); break; default: @@ -161,10 +226,7 @@ gst_rtp_onvif_timestamp_finalize (GObject * object) { GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (object); - if (self->buffer) - gst_buffer_unref (self->buffer); - if (self->list) - gst_buffer_list_unref (self->list); + g_queue_free (self->event_queue); G_OBJECT_CLASS (gst_rtp_onvif_timestamp_parent_class)->finalize (object); } @@ -219,42 +281,62 @@ gst_rtp_onvif_timestamp_class_init (GstRtpOnvifTimestampClass * klass) 0, "ONVIF NTP timestamps RTP extension"); } -static GstFlowReturn handle_and_push_buffer (GstRtpOnvifTimestamp * self, - GstBuffer * buf, gboolean end_contiguous); -static GstFlowReturn handle_and_push_buffer_list (GstRtpOnvifTimestamp * self, - GstBufferList * list, gboolean end_contiguous); - static gboolean gst_rtp_onvif_timestamp_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) { GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent); + gboolean drop = FALSE; + gboolean ret = TRUE; GST_DEBUG_OBJECT (pad, "handling event %s", GST_EVENT_TYPE_NAME (event)); + /* handle serialized events, which, should not be enqueued */ switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_SEGMENT: - gst_event_copy_segment (event, &self->segment); - break; + case GST_EVENT_EOS: + { + GstFlowReturn res; + /* Push pending buffers, if any */ + res = send_cached_buffer_and_events (self, TRUE); + if (res != GST_FLOW_OK) { + drop = TRUE; + ret = FALSE; + goto out; + } + break; + } case GST_EVENT_FLUSH_STOP: + purge_cached_buffer_and_events (self); gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED); break; - case GST_EVENT_EOS: - /* Push pending buffers, if any */ - if (self->buffer) { - handle_and_push_buffer (self, self->buffer, TRUE); - self->buffer = NULL; - } - if (self->list) { - handle_and_push_buffer_list (self, self->list, TRUE); - self->list = NULL; - } + default: + break; + } + + /* enqueue serialized events if there is a cached buffer */ + if (GST_EVENT_IS_SERIALIZED (event) && (self->buffer || self->list)) { + GST_DEBUG ("enqueueing serialized event"); + g_queue_push_tail (self->event_queue, event); + event = NULL; + goto out; + } + + /* handle rest of the events */ + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEGMENT: + gst_event_copy_segment (event, &self->segment); break; default: break; } - return gst_pad_event_default (pad, parent, event); +out: + if (drop) + gst_event_unref (event); + else if (event) + ret = gst_pad_event_default (pad, parent, event); + + return ret; } static void @@ -278,10 +360,11 @@ gst_rtp_onvif_timestamp_init (GstRtpOnvifTimestamp * self) self->prop_ntp_offset = DEFAULT_NTP_OFFSET; self->prop_set_e_bit = DEFAULT_SET_E_BIT; + gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED); + + self->event_queue = g_queue_new (); self->buffer = NULL; self->list = NULL; - - gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED); } #define EXTENSION_ID 0xABAC @@ -414,7 +497,7 @@ done: return TRUE; } -/* @buf: (transfer all) */ +/* @buf: (transfer full) */ static GstFlowReturn handle_and_push_buffer (GstRtpOnvifTimestamp * self, GstBuffer * buf, gboolean end_contiguous) @@ -439,20 +522,15 @@ gst_rtp_onvif_timestamp_chain (GstPad * pad, GstObject * parent, return handle_and_push_buffer (self, buf, FALSE); } - /* We have to wait for the *next* buffer before pushing this one */ + /* send any previously cached item(s), this leaves an empty queue */ + result = send_cached_buffer_and_events (self, GST_BUFFER_IS_DISCONT (buf)); - if (self->buffer) { - /* push the *previous* buffer received */ - result = handle_and_push_buffer (self, self->buffer, - GST_BUFFER_IS_DISCONT (buf)); - } - - /* Transfer ownership */ + /* enqueue the new item, as the only item in the queue */ self->buffer = buf; return result; } -/* @buf: (transfer all) */ +/* @buf: (transfer full) */ static GstFlowReturn handle_and_push_buffer_list (GstRtpOnvifTimestamp * self, GstBufferList * list, gboolean end_contiguous) @@ -477,24 +555,18 @@ gst_rtp_onvif_timestamp_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list) { GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent); - GstFlowReturn result = GST_FLOW_OK; GstBuffer *buf; + GstFlowReturn result = GST_FLOW_OK; if (!self->prop_set_e_bit) { return handle_and_push_buffer_list (self, list, FALSE); } - /* We have to wait for the *next* list before pushing this one */ - - if (self->list) { - /* push the *previous* list received */ - buf = gst_buffer_list_get (list, 0); - - result = handle_and_push_buffer_list (self, self->list, - GST_BUFFER_IS_DISCONT (buf)); - } + /* send any previously cached item(s), this leaves an empty queue */ + buf = gst_buffer_list_get (list, 0); + result = send_cached_buffer_and_events (self, GST_BUFFER_IS_DISCONT (buf)); - /* Transfer ownership */ + /* enqueue the new item, as the only item in the queue */ self->list = list; return result; } diff --git a/gst/onvif/gstrtponviftimestamp.h b/gst/onvif/gstrtponviftimestamp.h index 40ad8d377..b8e2b8e3e 100644 --- a/gst/onvif/gstrtponviftimestamp.h +++ b/gst/onvif/gstrtponviftimestamp.h @@ -55,8 +55,8 @@ struct _GstRtpOnvifTimestamp { GstClockTime ntp_offset; GstSegment segment; - gboolean received_segment; /* Buffer waiting to be handled, only used if prop_set_e_bit is TRUE */ + GQueue *event_queue; GstBuffer *buffer; GstBufferList *list; }; |