From c148ecf2cbb910f86564b0caf4240069d041de95 Mon Sep 17 00:00:00 2001 From: Guillaume Desmottes Date: Mon, 22 Feb 2021 13:17:18 +0100 Subject: appsrc: serialize custom events with buffers flow Application may want to inject events to the pipeline and keep them synchronized with the buffers flow. Fix #247 Part-of: --- gst-libs/gst/app/gstappsrc.c | 87 +++++++++++++++++++++++++++++++------------ tests/check/elements/appsrc.c | 73 ++++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 23 deletions(-) diff --git a/gst-libs/gst/app/gstappsrc.c b/gst-libs/gst/app/gstappsrc.c index 457220d02..ce0875d61 100644 --- a/gst-libs/gst/app/gstappsrc.c +++ b/gst-libs/gst/app/gstappsrc.c @@ -1014,6 +1014,17 @@ gst_app_src_send_event (GstElement * element, GstEvent * event) g_mutex_unlock (&priv->mutex); break; default: + if (GST_EVENT_IS_SERIALIZED (event)) { + GST_DEBUG_OBJECT (appsrc, "queue event: %" GST_PTR_FORMAT, event); + g_mutex_lock (&priv->mutex); + gst_queue_array_push_tail (priv->queue, event); + + if ((priv->wait_status & STREAM_WAITING)) + g_cond_broadcast (&priv->cond); + + g_mutex_unlock (&priv->mutex); + return TRUE; + } break; } @@ -1596,6 +1607,11 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, } while (TRUE) { + /* Our lock may have been release to push events or caps, check out + * state in case we are now flushing. */ + if (G_UNLIKELY (priv->flushing)) + goto flushing; + /* return data as long as we have some */ if (!gst_queue_array_is_empty (priv->queue)) { GstMiniObject *obj = gst_queue_array_pop_head (priv->queue); @@ -1618,13 +1634,6 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, if (caps_changed) gst_app_src_do_negotiate (bsrc); - /* Lock has released so now may need - *- flushing - *- new caps change - *- check queue has data */ - if (G_UNLIKELY (priv->flushing)) - goto flushing; - /* Continue checks caps and queue */ continue; } @@ -1661,24 +1670,56 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, *buf = NULL; } else if (GST_IS_EVENT (obj)) { GstEvent *event = GST_EVENT (obj); - const GstSegment *segment = NULL; - - gst_event_parse_segment (event, &segment); - g_assert (segment != NULL); - - if (!gst_segment_is_equal (&priv->current_segment, segment)) { - GST_DEBUG_OBJECT (appsrc, - "Update new segment %" GST_PTR_FORMAT, event); - if (!gst_base_src_new_segment (bsrc, segment)) { - GST_ERROR_OBJECT (appsrc, - "Couldn't set new segment %" GST_PTR_FORMAT, event); - gst_event_unref (event); - goto invalid_segment; + + GST_DEBUG_OBJECT (appsrc, "pop event %" GST_PTR_FORMAT, event); + + if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { + const GstSegment *segment = NULL; + + gst_event_parse_segment (event, &segment); + g_assert (segment != NULL); + + if (!gst_segment_is_equal (&priv->current_segment, segment)) { + GST_DEBUG_OBJECT (appsrc, + "Update new segment %" GST_PTR_FORMAT, event); + if (!gst_base_src_new_segment (bsrc, segment)) { + GST_ERROR_OBJECT (appsrc, + "Couldn't set new segment %" GST_PTR_FORMAT, event); + gst_event_unref (event); + goto invalid_segment; + } + gst_segment_copy_into (segment, &priv->current_segment); } - gst_segment_copy_into (segment, &priv->current_segment); - } - gst_event_unref (event); + gst_event_unref (event); + } else { + GstEvent *seg_event; + GstSegment last_segment = priv->last_segment; + + /* event is serialized with the buffers flow */ + + /* We are about to push an event, release out lock */ + g_mutex_unlock (&priv->mutex); + + seg_event = + gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc), + GST_EVENT_SEGMENT, 0); + if (!seg_event) { + seg_event = gst_event_new_segment (&last_segment); + + GST_DEBUG_OBJECT (appsrc, + "received serialized event before first buffer, push default segment %" + GST_PTR_FORMAT, seg_event); + + gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), seg_event); + } else { + gst_event_unref (seg_event); + } + + gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event); + + g_mutex_lock (&priv->mutex); + } continue; } else { g_assert_not_reached (); diff --git a/tests/check/elements/appsrc.c b/tests/check/elements/appsrc.c index ae25429d0..732c5d236 100644 --- a/tests/check/elements/appsrc.c +++ b/tests/check/elements/appsrc.c @@ -1368,6 +1368,78 @@ GST_START_TEST (test_appsrc_limits) GST_END_TEST; +static GstFlowReturn +send_event_chain_func (GstPad * pad, GstObject * parent, GstBuffer * buf) +{ + GST_LOG (" buffer # %3u", (guint) GST_BUFFER_OFFSET (buf)); + + fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset); + ++expect_offset; + gst_buffer_unref (buf); + + if (expect_offset == 2) { + /* test is done */ + g_mutex_lock (&check_mutex); + done = TRUE; + g_cond_signal (&check_cond); + g_mutex_unlock (&check_mutex); + } + + return GST_FLOW_OK; +} + +static gboolean +send_event_event_func (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GST_LOG ("event %" GST_PTR_FORMAT, event); + if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_DOWNSTREAM) { + /* this event should arrive after the first buffer */ + fail_unless_equals_int (expect_offset, 1); + } + gst_event_unref (event); + return TRUE; +} + +/* check that custom downstream events are properly serialized with buffers */ +GST_START_TEST (test_appsrc_send_custom_event) +{ + GstElement *src; + GstBuffer *buf; + + src = setup_appsrc (); + + ASSERT_SET_STATE (src, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS); + + expect_offset = 0; + gst_pad_set_chain_function (mysinkpad, send_event_chain_func); + gst_pad_set_event_function (mysinkpad, send_event_event_func); + + /* send a buffer, a custom event and a second buffer */ + buf = gst_buffer_new_and_alloc (1); + GST_BUFFER_OFFSET (buf) = 0; + fail_unless (gst_app_src_push_buffer (GST_APP_SRC_CAST (src), + buf) == GST_FLOW_OK); + + gst_element_send_event (src, + gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM, + gst_structure_new ("custom", NULL, NULL))); + + buf = gst_buffer_new_and_alloc (2); + GST_BUFFER_OFFSET (buf) = 1; + fail_unless (gst_app_src_push_buffer (GST_APP_SRC_CAST (src), + buf) == GST_FLOW_OK); + + g_mutex_lock (&check_mutex); + while (!done) + g_cond_wait (&check_cond, &check_mutex); + g_mutex_unlock (&check_mutex); + + ASSERT_SET_STATE (src, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); + cleanup_appsrc (src); +} + +GST_END_TEST; + static Suite * appsrc_suite (void) { @@ -1382,6 +1454,7 @@ appsrc_suite (void) tcase_add_test (tc_chain, test_appsrc_period_with_custom_segment); tcase_add_test (tc_chain, test_appsrc_custom_segment_twice); tcase_add_test (tc_chain, test_appsrc_limits); + tcase_add_test (tc_chain, test_appsrc_send_custom_event); if (RUNNING_ON_VALGRIND) tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 5); -- cgit v1.2.1