summaryrefslogtreecommitdiff
path: root/gst
diff options
context:
space:
mode:
authorLinus Svensson <linussn@axis.com>2015-11-06 09:44:16 +0100
committerOlivier CrĂȘte <olivier.crete@collabora.com>2015-11-06 12:55:25 -0500
commita58826292eab80e83b9de525753895e56410e301 (patch)
tree96e33c811d871323bf24388651d338dbcc0f47fa /gst
parentfd0ca0a9723931463c8be93517d13b5ef799f6f0 (diff)
downloadgstreamer-plugins-bad-a58826292eab80e83b9de525753895e56410e301.tar.gz
rtponviftimestamp: Do not rearange order of data
If a buffer or a buffer list is cached, no events serialized with the data stream should get through. The cached buffers and events should be purged when we stop flushing. https://bugzilla.gnome.org/show_bug.cgi?id=757688
Diffstat (limited to 'gst')
-rw-r--r--gst/onvif/gstrtponviftimestamp.c164
-rw-r--r--gst/onvif/gstrtponviftimestamp.h2
2 files changed, 119 insertions, 47 deletions
diff --git a/gst/onvif/gstrtponviftimestamp.c b/gst/onvif/gstrtponviftimestamp.c
index 818c8afc6..b539c83aa 100644
--- a/gst/onvif/gstrtponviftimestamp.c
+++ b/gst/onvif/gstrtponviftimestamp.c
@@ -42,6 +42,11 @@ static GstFlowReturn gst_rtp_onvif_timestamp_chain (GstPad * pad,
static GstFlowReturn gst_rtp_onvif_timestamp_chain_list (GstPad * pad,
GstObject * parent, GstBufferList * list);
+static GstFlowReturn handle_and_push_buffer (GstRtpOnvifTimestamp * self,
+ GstBuffer * buf, gboolean end_contiguous);
+static GstFlowReturn handle_and_push_buffer_list (GstRtpOnvifTimestamp * self,
+ GstBufferList * list, gboolean end_contiguous);
+
static GstStaticPadTemplate sink_template_factory =
GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
@@ -112,6 +117,65 @@ gst_rtp_onvif_timestamp_set_property (GObject * object,
}
}
+/* send cached buffer or list, and events, if present */
+static GstFlowReturn
+send_cached_buffer_and_events (GstRtpOnvifTimestamp * self,
+ gboolean end_contiguous)
+{
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ g_assert (!(self->buffer && self->list));
+
+ if (self->buffer) {
+ GST_DEBUG_OBJECT (self, "pushing %" GST_PTR_FORMAT, self->buffer);
+ ret = handle_and_push_buffer (self, self->buffer, end_contiguous);
+ self->buffer = NULL;
+ }
+ if (self->list) {
+ GST_DEBUG_OBJECT (self, "pushing %" GST_PTR_FORMAT, self->list);
+ ret = handle_and_push_buffer_list (self, self->list, end_contiguous);
+ self->list = NULL;
+ }
+
+ if (ret != GST_FLOW_OK)
+ goto out;
+
+ while (!g_queue_is_empty (self->event_queue)) {
+ GstEvent *event;
+
+ event = GST_EVENT_CAST (g_queue_pop_head (self->event_queue));
+ GST_LOG_OBJECT (self->sinkpad, "sending %" GST_PTR_FORMAT, event);
+ (void) gst_pad_send_event (self->sinkpad, event);
+ }
+
+out:
+ return ret;
+}
+
+static void
+purge_cached_buffer_and_events (GstRtpOnvifTimestamp * self)
+{
+ g_assert (!(self->buffer && self->list));
+
+ if (self->buffer) {
+ GST_DEBUG_OBJECT (self, "purging %" GST_PTR_FORMAT, self->buffer);
+ gst_buffer_unref (self->buffer);
+ self->buffer = NULL;
+ }
+ if (self->list) {
+ GST_DEBUG_OBJECT (self, "purging %" GST_PTR_FORMAT, self->list);
+ gst_buffer_list_unref (self->list);
+ self->list = NULL;
+ }
+
+ while (!g_queue_is_empty (self->event_queue)) {
+ GstEvent *event;
+
+ event = GST_EVENT_CAST (g_queue_pop_head (self->event_queue));
+ gst_event_unref (event);
+ }
+}
+
static GstStateChangeReturn
gst_rtp_onvif_timestamp_change_state (GstElement * element,
GstStateChange transition)
@@ -121,6 +185,7 @@ gst_rtp_onvif_timestamp_change_state (GstElement * element,
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
+ purge_cached_buffer_and_events (self);
gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
break;
default:
@@ -161,10 +226,7 @@ gst_rtp_onvif_timestamp_finalize (GObject * object)
{
GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (object);
- if (self->buffer)
- gst_buffer_unref (self->buffer);
- if (self->list)
- gst_buffer_list_unref (self->list);
+ g_queue_free (self->event_queue);
G_OBJECT_CLASS (gst_rtp_onvif_timestamp_parent_class)->finalize (object);
}
@@ -219,42 +281,62 @@ gst_rtp_onvif_timestamp_class_init (GstRtpOnvifTimestampClass * klass)
0, "ONVIF NTP timestamps RTP extension");
}
-static GstFlowReturn handle_and_push_buffer (GstRtpOnvifTimestamp * self,
- GstBuffer * buf, gboolean end_contiguous);
-static GstFlowReturn handle_and_push_buffer_list (GstRtpOnvifTimestamp * self,
- GstBufferList * list, gboolean end_contiguous);
-
static gboolean
gst_rtp_onvif_timestamp_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event)
{
GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent);
+ gboolean drop = FALSE;
+ gboolean ret = TRUE;
GST_DEBUG_OBJECT (pad, "handling event %s", GST_EVENT_TYPE_NAME (event));
+ /* handle serialized events, which, should not be enqueued */
switch (GST_EVENT_TYPE (event)) {
- case GST_EVENT_SEGMENT:
- gst_event_copy_segment (event, &self->segment);
- break;
+ case GST_EVENT_EOS:
+ {
+ GstFlowReturn res;
+ /* Push pending buffers, if any */
+ res = send_cached_buffer_and_events (self, TRUE);
+ if (res != GST_FLOW_OK) {
+ drop = TRUE;
+ ret = FALSE;
+ goto out;
+ }
+ break;
+ }
case GST_EVENT_FLUSH_STOP:
+ purge_cached_buffer_and_events (self);
gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
break;
- case GST_EVENT_EOS:
- /* Push pending buffers, if any */
- if (self->buffer) {
- handle_and_push_buffer (self, self->buffer, TRUE);
- self->buffer = NULL;
- }
- if (self->list) {
- handle_and_push_buffer_list (self, self->list, TRUE);
- self->list = NULL;
- }
+ default:
+ break;
+ }
+
+ /* enqueue serialized events if there is a cached buffer */
+ if (GST_EVENT_IS_SERIALIZED (event) && (self->buffer || self->list)) {
+ GST_DEBUG ("enqueueing serialized event");
+ g_queue_push_tail (self->event_queue, event);
+ event = NULL;
+ goto out;
+ }
+
+ /* handle rest of the events */
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_SEGMENT:
+ gst_event_copy_segment (event, &self->segment);
break;
default:
break;
}
- return gst_pad_event_default (pad, parent, event);
+out:
+ if (drop)
+ gst_event_unref (event);
+ else if (event)
+ ret = gst_pad_event_default (pad, parent, event);
+
+ return ret;
}
static void
@@ -278,10 +360,11 @@ gst_rtp_onvif_timestamp_init (GstRtpOnvifTimestamp * self)
self->prop_ntp_offset = DEFAULT_NTP_OFFSET;
self->prop_set_e_bit = DEFAULT_SET_E_BIT;
+ gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
+
+ self->event_queue = g_queue_new ();
self->buffer = NULL;
self->list = NULL;
-
- gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
}
#define EXTENSION_ID 0xABAC
@@ -414,7 +497,7 @@ done:
return TRUE;
}
-/* @buf: (transfer all) */
+/* @buf: (transfer full) */
static GstFlowReturn
handle_and_push_buffer (GstRtpOnvifTimestamp * self, GstBuffer * buf,
gboolean end_contiguous)
@@ -439,20 +522,15 @@ gst_rtp_onvif_timestamp_chain (GstPad * pad, GstObject * parent,
return handle_and_push_buffer (self, buf, FALSE);
}
- /* We have to wait for the *next* buffer before pushing this one */
+ /* send any previously cached item(s), this leaves an empty queue */
+ result = send_cached_buffer_and_events (self, GST_BUFFER_IS_DISCONT (buf));
- if (self->buffer) {
- /* push the *previous* buffer received */
- result = handle_and_push_buffer (self, self->buffer,
- GST_BUFFER_IS_DISCONT (buf));
- }
-
- /* Transfer ownership */
+ /* enqueue the new item, as the only item in the queue */
self->buffer = buf;
return result;
}
-/* @buf: (transfer all) */
+/* @buf: (transfer full) */
static GstFlowReturn
handle_and_push_buffer_list (GstRtpOnvifTimestamp * self,
GstBufferList * list, gboolean end_contiguous)
@@ -477,24 +555,18 @@ gst_rtp_onvif_timestamp_chain_list (GstPad * pad, GstObject * parent,
GstBufferList * list)
{
GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent);
- GstFlowReturn result = GST_FLOW_OK;
GstBuffer *buf;
+ GstFlowReturn result = GST_FLOW_OK;
if (!self->prop_set_e_bit) {
return handle_and_push_buffer_list (self, list, FALSE);
}
- /* We have to wait for the *next* list before pushing this one */
-
- if (self->list) {
- /* push the *previous* list received */
- buf = gst_buffer_list_get (list, 0);
-
- result = handle_and_push_buffer_list (self, self->list,
- GST_BUFFER_IS_DISCONT (buf));
- }
+ /* send any previously cached item(s), this leaves an empty queue */
+ buf = gst_buffer_list_get (list, 0);
+ result = send_cached_buffer_and_events (self, GST_BUFFER_IS_DISCONT (buf));
- /* Transfer ownership */
+ /* enqueue the new item, as the only item in the queue */
self->list = list;
return result;
}
diff --git a/gst/onvif/gstrtponviftimestamp.h b/gst/onvif/gstrtponviftimestamp.h
index 40ad8d377..b8e2b8e3e 100644
--- a/gst/onvif/gstrtponviftimestamp.h
+++ b/gst/onvif/gstrtponviftimestamp.h
@@ -55,8 +55,8 @@ struct _GstRtpOnvifTimestamp {
GstClockTime ntp_offset;
GstSegment segment;
- gboolean received_segment;
/* Buffer waiting to be handled, only used if prop_set_e_bit is TRUE */
+ GQueue *event_queue;
GstBuffer *buffer;
GstBufferList *list;
};