summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuillaume Desmottes <guillaume.desmottes@collabora.com>2021-02-22 13:17:18 +0100
committerGuillaume Desmottes <guillaume.desmottes@collabora.com>2021-07-22 13:56:22 +0200
commitc148ecf2cbb910f86564b0caf4240069d041de95 (patch)
tree6395a421592312d3bd76c4259a642b9b70c72e78
parent0a657d6db5ba912b13092a907ea507638cd01cf9 (diff)
downloadgstreamer-plugins-base-c148ecf2cbb910f86564b0caf4240069d041de95.tar.gz
appsrc: serialize custom events with buffers flow
Application may want to inject events to the pipeline and keep them synchronized with the buffers flow. Fix #247 Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/1046>
-rw-r--r--gst-libs/gst/app/gstappsrc.c87
-rw-r--r--tests/check/elements/appsrc.c73
2 files changed, 137 insertions, 23 deletions
diff --git a/gst-libs/gst/app/gstappsrc.c b/gst-libs/gst/app/gstappsrc.c
index 457220d02..ce0875d61 100644
--- a/gst-libs/gst/app/gstappsrc.c
+++ b/gst-libs/gst/app/gstappsrc.c
@@ -1014,6 +1014,17 @@ gst_app_src_send_event (GstElement * element, GstEvent * event)
g_mutex_unlock (&priv->mutex);
break;
default:
+ if (GST_EVENT_IS_SERIALIZED (event)) {
+ GST_DEBUG_OBJECT (appsrc, "queue event: %" GST_PTR_FORMAT, event);
+ g_mutex_lock (&priv->mutex);
+ gst_queue_array_push_tail (priv->queue, event);
+
+ if ((priv->wait_status & STREAM_WAITING))
+ g_cond_broadcast (&priv->cond);
+
+ g_mutex_unlock (&priv->mutex);
+ return TRUE;
+ }
break;
}
@@ -1596,6 +1607,11 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
}
while (TRUE) {
+ /* Our lock may have been release to push events or caps, check out
+ * state in case we are now flushing. */
+ if (G_UNLIKELY (priv->flushing))
+ goto flushing;
+
/* return data as long as we have some */
if (!gst_queue_array_is_empty (priv->queue)) {
GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
@@ -1618,13 +1634,6 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
if (caps_changed)
gst_app_src_do_negotiate (bsrc);
- /* Lock has released so now may need
- *- flushing
- *- new caps change
- *- check queue has data */
- if (G_UNLIKELY (priv->flushing))
- goto flushing;
-
/* Continue checks caps and queue */
continue;
}
@@ -1661,24 +1670,56 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
*buf = NULL;
} else if (GST_IS_EVENT (obj)) {
GstEvent *event = GST_EVENT (obj);
- const GstSegment *segment = NULL;
-
- gst_event_parse_segment (event, &segment);
- g_assert (segment != NULL);
-
- if (!gst_segment_is_equal (&priv->current_segment, segment)) {
- GST_DEBUG_OBJECT (appsrc,
- "Update new segment %" GST_PTR_FORMAT, event);
- if (!gst_base_src_new_segment (bsrc, segment)) {
- GST_ERROR_OBJECT (appsrc,
- "Couldn't set new segment %" GST_PTR_FORMAT, event);
- gst_event_unref (event);
- goto invalid_segment;
+
+ GST_DEBUG_OBJECT (appsrc, "pop event %" GST_PTR_FORMAT, event);
+
+ if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
+ const GstSegment *segment = NULL;
+
+ gst_event_parse_segment (event, &segment);
+ g_assert (segment != NULL);
+
+ if (!gst_segment_is_equal (&priv->current_segment, segment)) {
+ GST_DEBUG_OBJECT (appsrc,
+ "Update new segment %" GST_PTR_FORMAT, event);
+ if (!gst_base_src_new_segment (bsrc, segment)) {
+ GST_ERROR_OBJECT (appsrc,
+ "Couldn't set new segment %" GST_PTR_FORMAT, event);
+ gst_event_unref (event);
+ goto invalid_segment;
+ }
+ gst_segment_copy_into (segment, &priv->current_segment);
}
- gst_segment_copy_into (segment, &priv->current_segment);
- }
- gst_event_unref (event);
+ gst_event_unref (event);
+ } else {
+ GstEvent *seg_event;
+ GstSegment last_segment = priv->last_segment;
+
+ /* event is serialized with the buffers flow */
+
+ /* We are about to push an event, release out lock */
+ g_mutex_unlock (&priv->mutex);
+
+ seg_event =
+ gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc),
+ GST_EVENT_SEGMENT, 0);
+ if (!seg_event) {
+ seg_event = gst_event_new_segment (&last_segment);
+
+ GST_DEBUG_OBJECT (appsrc,
+ "received serialized event before first buffer, push default segment %"
+ GST_PTR_FORMAT, seg_event);
+
+ gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), seg_event);
+ } else {
+ gst_event_unref (seg_event);
+ }
+
+ gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event);
+
+ g_mutex_lock (&priv->mutex);
+ }
continue;
} else {
g_assert_not_reached ();
diff --git a/tests/check/elements/appsrc.c b/tests/check/elements/appsrc.c
index ae25429d0..732c5d236 100644
--- a/tests/check/elements/appsrc.c
+++ b/tests/check/elements/appsrc.c
@@ -1368,6 +1368,78 @@ GST_START_TEST (test_appsrc_limits)
GST_END_TEST;
+static GstFlowReturn
+send_event_chain_func (GstPad * pad, GstObject * parent, GstBuffer * buf)
+{
+ GST_LOG (" buffer # %3u", (guint) GST_BUFFER_OFFSET (buf));
+
+ fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset);
+ ++expect_offset;
+ gst_buffer_unref (buf);
+
+ if (expect_offset == 2) {
+ /* test is done */
+ g_mutex_lock (&check_mutex);
+ done = TRUE;
+ g_cond_signal (&check_cond);
+ g_mutex_unlock (&check_mutex);
+ }
+
+ return GST_FLOW_OK;
+}
+
+static gboolean
+send_event_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ GST_LOG ("event %" GST_PTR_FORMAT, event);
+ if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_DOWNSTREAM) {
+ /* this event should arrive after the first buffer */
+ fail_unless_equals_int (expect_offset, 1);
+ }
+ gst_event_unref (event);
+ return TRUE;
+}
+
+/* check that custom downstream events are properly serialized with buffers */
+GST_START_TEST (test_appsrc_send_custom_event)
+{
+ GstElement *src;
+ GstBuffer *buf;
+
+ src = setup_appsrc ();
+
+ ASSERT_SET_STATE (src, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
+
+ expect_offset = 0;
+ gst_pad_set_chain_function (mysinkpad, send_event_chain_func);
+ gst_pad_set_event_function (mysinkpad, send_event_event_func);
+
+ /* send a buffer, a custom event and a second buffer */
+ buf = gst_buffer_new_and_alloc (1);
+ GST_BUFFER_OFFSET (buf) = 0;
+ fail_unless (gst_app_src_push_buffer (GST_APP_SRC_CAST (src),
+ buf) == GST_FLOW_OK);
+
+ gst_element_send_event (src,
+ gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
+ gst_structure_new ("custom", NULL, NULL)));
+
+ buf = gst_buffer_new_and_alloc (2);
+ GST_BUFFER_OFFSET (buf) = 1;
+ fail_unless (gst_app_src_push_buffer (GST_APP_SRC_CAST (src),
+ buf) == GST_FLOW_OK);
+
+ g_mutex_lock (&check_mutex);
+ while (!done)
+ g_cond_wait (&check_cond, &check_mutex);
+ g_mutex_unlock (&check_mutex);
+
+ ASSERT_SET_STATE (src, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+ cleanup_appsrc (src);
+}
+
+GST_END_TEST;
+
static Suite *
appsrc_suite (void)
{
@@ -1382,6 +1454,7 @@ appsrc_suite (void)
tcase_add_test (tc_chain, test_appsrc_period_with_custom_segment);
tcase_add_test (tc_chain, test_appsrc_custom_segment_twice);
tcase_add_test (tc_chain, test_appsrc_limits);
+ tcase_add_test (tc_chain, test_appsrc_send_custom_event);
if (RUNNING_ON_VALGRIND)
tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 5);