/* A set of utility functions that are common between elements * based upon GstAdaptiveDemux * * Copyright (c) <2015> YouView TV Ltd * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ #include #include "adaptive_demux_engine.h" #include "adaptive_demux_common.h" #define GST_TEST_HTTP_SRC_NAME "testhttpsrc" #define gst_adaptive_demux_test_case_parent_class parent_class static void gst_adaptive_demux_test_case_dispose (GObject * object); static void gst_adaptive_demux_test_case_finalize (GObject * object); static void gst_adaptive_demux_test_case_clear (GstAdaptiveDemuxTestCase * testData); G_DEFINE_TYPE (GstAdaptiveDemuxTestCase, gst_adaptive_demux_test_case, G_TYPE_OBJECT); static void gst_adaptive_demux_test_case_class_init (GstAdaptiveDemuxTestCaseClass * klass) { GObjectClass *object = G_OBJECT_CLASS (klass); object->dispose = gst_adaptive_demux_test_case_dispose; object->finalize = gst_adaptive_demux_test_case_finalize; } static void gst_adaptive_demux_test_case_init (GstAdaptiveDemuxTestCase * testData) { testData->output_streams = NULL; testData->test_task = NULL; g_rec_mutex_init (&testData->test_task_lock); g_mutex_init (&testData->test_task_state_lock); g_cond_init (&testData->test_task_state_cond); gst_adaptive_demux_test_case_clear (testData); } static void gst_adaptive_demux_test_case_clear (GstAdaptiveDemuxTestCase * testData) { if (testData->output_streams) { g_list_free (testData->output_streams); testData->output_streams = NULL; } testData->count_of_finished_streams = 0; if (testData->test_task) { gst_task_stop (testData->test_task); gst_task_join (testData->test_task); gst_object_unref (testData->test_task); testData->test_task = NULL; } testData->signal_context = NULL; testData->test_task_state = TEST_TASK_STATE_NOT_STARTED; testData->threshold_for_seek = 0; gst_event_replace (&testData->seek_event, NULL); testData->signal_context = NULL; } static void gst_adaptive_demux_test_case_dispose (GObject * object) { GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (object); gst_adaptive_demux_test_case_clear (testData); GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object)); } static void gst_adaptive_demux_test_case_finalize (GObject * object) { GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (object); g_cond_clear (&testData->test_task_state_cond); g_mutex_clear (&testData->test_task_state_lock); g_rec_mutex_clear (&testData->test_task_lock); if (testData->test_task) { gst_task_stop (testData->test_task); gst_task_join (testData->test_task); gst_object_unref (testData->test_task); testData->test_task = NULL; } if (testData->output_streams) { g_list_free (testData->output_streams); testData->output_streams = NULL; } GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object)); } /** * gst_adaptive_demux_test_case_new: * * Creates a new #GstAdaptiveDemuxTestCase. Free with g_object_unref(). * * Returns: (transfer full): a new #GstAdaptiveDemuxTestCase */ GstAdaptiveDemuxTestCase * gst_adaptive_demux_test_case_new (void) { return g_object_new (GST_TYPE_ADAPTIVE_DEMUX_TEST_CASE, NULL); } GstAdaptiveDemuxTestExpectedOutput * gst_adaptive_demux_test_find_test_data_by_stream (GstAdaptiveDemuxTestCase * testData, GstAdaptiveDemuxTestOutputStream * stream, guint * index) { gchar *pad_name; GstAdaptiveDemuxTestExpectedOutput *ret = NULL; guint count = 0; GList *walk; pad_name = gst_pad_get_name (stream->pad); fail_unless (pad_name != NULL); for (walk = testData->output_streams; walk; walk = g_list_next (walk)) { GstAdaptiveDemuxTestExpectedOutput *td = walk->data; if (strcmp (td->name, pad_name) == 0) { ret = td; if (index) *index = count; } ++count; } g_free (pad_name); return ret; } /* function to validate data received by AppSink */ gboolean gst_adaptive_demux_test_check_received_data (GstAdaptiveDemuxTestEngine * engine, GstAdaptiveDemuxTestOutputStream * stream, GstBuffer * buffer, gpointer user_data) { GstMapInfo info; guint pattern; guint64 streamOffset; GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); GstAdaptiveDemuxTestExpectedOutput *testOutputStreamData; guint64 i; fail_unless (stream != NULL); fail_unless (engine->pipeline != NULL); testOutputStreamData = gst_adaptive_demux_test_find_test_data_by_stream (testData, stream, NULL); fail_unless (testOutputStreamData != NULL); GST_DEBUG ("total_received_size=%" G_GUINT64_FORMAT " segment_received_size = %" G_GUINT64_FORMAT " buffer_size=%" G_GUINT64_FORMAT " expected_size=%" G_GUINT64_FORMAT " segment_start = %" G_GUINT64_FORMAT, stream->total_received_size, stream->segment_received_size, (guint64) gst_buffer_get_size (buffer), testOutputStreamData->expected_size, stream->segment_start); /* Only verify after seeking */ if (testData->seek_event && testData->seeked) fail_unless (stream->total_received_size + stream->segment_received_size + gst_buffer_get_size (buffer) <= testOutputStreamData->expected_size, "Received unexpected data, please check what segments are being downloaded"); streamOffset = stream->segment_start + stream->segment_received_size; if (testOutputStreamData->expected_data) { gsize size = gst_buffer_get_size (buffer); if (gst_buffer_memcmp (buffer, 0, &testOutputStreamData->expected_data[streamOffset], size) == 0) { return TRUE; } /* If buffers do not match, fall back to a slower byte-based check so that the test can output the position where the received data diverges from expected_data */ } gst_buffer_map (buffer, &info, GST_MAP_READ); pattern = streamOffset - streamOffset % sizeof (pattern); for (i = 0; i != info.size; ++i) { guint received = info.data[i]; guint expected; if (testOutputStreamData->expected_data) { fail_unless (streamOffset + i < testOutputStreamData->expected_size); expected = testOutputStreamData->expected_data[streamOffset + i]; } else { gchar pattern_byte_to_read; pattern_byte_to_read = (streamOffset + i) % sizeof (pattern); if (pattern_byte_to_read == 0) { pattern = streamOffset + i; } expected = (pattern >> (pattern_byte_to_read * 8)) & 0xFF; #if 0 GST_DEBUG ("received '0x%02x' expected '0x%02x' offset %" G_GUINT64_FORMAT " pattern=%08x byte_to_read=%d", received, expected, i, pattern, pattern_byte_to_read); #endif } fail_unless (received == expected, "output validation failed: received '0x%02x' expected '0x%02x' byte %" G_GUINT64_FORMAT " offset=%" G_GUINT64_FORMAT "\n", received, expected, i, streamOffset); } gst_buffer_unmap (buffer, &info); return TRUE; } /* AppSink EOS callback. * To be used by tests that don't expect AppSink to receive EOS. */ void gst_adaptive_demux_test_unexpected_eos (GstAdaptiveDemuxTestEngine * engine, GstAdaptiveDemuxTestOutputStream * stream, gpointer user_data) { fail_if (TRUE); } /* AppSink EOS callback. * To be used by tests that expect AppSink to receive EOS. * Will check total size of data received by AppSink. */ void gst_adaptive_demux_test_check_size_of_received_data (GstAdaptiveDemuxTestEngine * engine, GstAdaptiveDemuxTestOutputStream * stream, gpointer user_data) { GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); GstAdaptiveDemuxTestExpectedOutput *testOutputStreamData; testOutputStreamData = gst_adaptive_demux_test_find_test_data_by_stream (testData, stream, NULL); fail_unless (testOutputStreamData != NULL); fail_unless (stream->total_received_size == testOutputStreamData->expected_size, "size validation failed, expected %d received %d", testOutputStreamData->expected_size, stream->total_received_size); testData->count_of_finished_streams++; if (testData->count_of_finished_streams == g_list_length (testData->output_streams)) { g_main_loop_quit (engine->loop); } } typedef struct _SeekTaskContext { GstElement *pipeline; GstTask *task; GstEvent *seek_event; } SeekTaskContext; /* function to generate a seek event. Will be run in a separate thread */ static void testSeekTaskDoSeek (gpointer user_data) { SeekTaskContext *context = (SeekTaskContext *) user_data; GstTask *task; GST_DEBUG ("testSeekTaskDoSeek calling seek"); fail_unless (GST_IS_EVENT (context->seek_event)); fail_unless (GST_EVENT_TYPE (context->seek_event) == GST_EVENT_SEEK); if (!gst_element_send_event (GST_ELEMENT (context->pipeline), context->seek_event)) fail ("Seek failed!\n"); GST_DEBUG ("seek ok"); task = context->task; g_slice_free (SeekTaskContext, context); gst_task_stop (task); } /* function to be called during seek test when demux sends data to AppSink * It monitors the data sent and after a while will generate a seek request. */ static gboolean testSeekAdaptiveDemuxSendsData (GstAdaptiveDemuxTestEngine * engine, GstAdaptiveDemuxTestOutputStream * stream, GstBuffer * buffer, gpointer user_data) { GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); SeekTaskContext *seekContext; GstAdaptiveDemuxTestExpectedOutput *testOutputStreamData; guint index = 0; testOutputStreamData = gst_adaptive_demux_test_find_test_data_by_stream (testData, stream, &index); fail_unless (testOutputStreamData != NULL); /* first entry in testData->output_streams is the PAD on which to perform the seek */ if (index == 0 && testData->test_task == NULL && (stream->total_received_size + stream->segment_received_size) >= testData->threshold_for_seek) { GstSeekFlags seek_flags; testData->threshold_for_seek = stream->total_received_size + stream->segment_received_size; gst_event_parse_seek (testData->seek_event, NULL, NULL, &seek_flags, NULL, NULL, NULL, NULL); if (seek_flags & GST_SEEK_FLAG_FLUSH) testOutputStreamData->expected_size += testData->threshold_for_seek; GST_DEBUG ("starting seek task"); g_mutex_lock (&testData->test_task_state_lock); testData->test_task_state = TEST_TASK_STATE_WAITING_FOR_TESTSRC_STATE_CHANGE; g_mutex_unlock (&testData->test_task_state_lock); seekContext = g_slice_new (SeekTaskContext); seekContext->pipeline = engine->pipeline; seekContext->seek_event = gst_event_ref (testData->seek_event); testData->test_task = seekContext->task = gst_task_new ((GstTaskFunction) testSeekTaskDoSeek, seekContext, NULL); gst_task_set_lock (testData->test_task, &testData->test_task_lock); gst_task_start (testData->test_task); GST_DEBUG ("seek task started"); if (seek_flags & GST_SEEK_FLAG_FLUSH) { g_mutex_lock (&testData->test_task_state_lock); GST_DEBUG ("waiting for seek task to change state on testsrc"); /* wait for test_task to run, send a flush start event to AppSink * and change the testhttpsrc element state from PLAYING to PAUSED */ while (testData->test_task_state == TEST_TASK_STATE_WAITING_FOR_TESTSRC_STATE_CHANGE) { g_cond_wait (&testData->test_task_state_cond, &testData->test_task_state_lock); } testData->seeked = TRUE; g_mutex_unlock (&testData->test_task_state_lock); /* we can continue now, but this buffer will be rejected by AppSink * because it is in flushing mode */ GST_DEBUG ("seek task changed state on testsrc, resuming"); } } return TRUE; } static void testSeekAdaptiveAppSinkEvent (GstAdaptiveDemuxTestEngine * engine, GstAdaptiveDemuxTestOutputStream * stream, GstEvent * event, gpointer user_data) { GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); GstAdaptiveDemuxTestExpectedOutput *testOutputStreamData; guint index = 0; testOutputStreamData = gst_adaptive_demux_test_find_test_data_by_stream (testData, stream, &index); fail_unless (testOutputStreamData != NULL); if (testData->seek_event && GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT && testOutputStreamData->post_seek_segment.format != GST_FORMAT_UNDEFINED && gst_event_get_seqnum (event) == gst_event_get_seqnum (testData->seek_event)) { const GstSegment *seek_segment; gst_event_parse_segment (event, &seek_segment); fail_unless (seek_segment->format == testOutputStreamData->post_seek_segment.format); fail_unless (seek_segment->rate == testOutputStreamData->post_seek_segment.rate); fail_unless (seek_segment->start == testOutputStreamData->post_seek_segment.start); fail_unless (seek_segment->stop == testOutputStreamData->post_seek_segment.stop); fail_unless (seek_segment->base == testOutputStreamData->post_seek_segment.base); fail_unless (seek_segment->time == testOutputStreamData->post_seek_segment.time); testOutputStreamData->segment_verification_needed = FALSE; } } /* callback called when main_loop detects a state changed event */ static void testSeekOnStateChanged (GstBus * bus, GstMessage * msg, gpointer user_data) { GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); GstState old_state, new_state; const char *srcName = GST_OBJECT_NAME (msg->src); gst_message_parse_state_changed (msg, &old_state, &new_state, NULL); GST_DEBUG ("Element %s changed state from %s to %s", GST_OBJECT_NAME (msg->src), gst_element_state_get_name (old_state), gst_element_state_get_name (new_state)); if (strstr (srcName, "srcbin") == srcName && old_state == GST_STATE_PLAYING && new_state == GST_STATE_PAUSED) { g_mutex_lock (&testData->test_task_state_lock); if (testData->test_task_state == TEST_TASK_STATE_WAITING_FOR_TESTSRC_STATE_CHANGE) { GST_DEBUG ("changing test_task_state"); testData->test_task_state = TEST_TASK_STATE_EXITING; gst_bus_remove_signal_watch (bus); g_cond_signal (&testData->test_task_state_cond); } g_mutex_unlock (&testData->test_task_state_lock); } } /* * Issue a seek request after media segment has started to be downloaded * on the first pad listed in GstAdaptiveDemuxTestOutputStreamData and the * first chunk of at least one byte has already arrived in AppSink */ static void testSeekPreTestCallback (GstAdaptiveDemuxTestEngine * engine, gpointer user_data) { GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); GstBus *bus; /* register a callback to listen for state change events */ bus = gst_pipeline_get_bus (GST_PIPELINE (engine->pipeline)); gst_bus_add_signal_watch (bus); g_signal_connect (bus, "message::state-changed", G_CALLBACK (testSeekOnStateChanged), testData); gst_object_unref (bus); } static void testSeekPostTestCallback (GstAdaptiveDemuxTestEngine * engine, gpointer user_data) { GList *walk; GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); for (walk = testData->output_streams; walk; walk = g_list_next (walk)) { GstAdaptiveDemuxTestExpectedOutput *td = walk->data; fail_if (td->segment_verification_needed); } } /* function to check total size of data received by AppSink * will be called when AppSink receives eos. */ void gst_adaptive_demux_test_download_error_size_of_received_data (GstAdaptiveDemuxTestEngine * engine, GstAdaptiveDemuxTestOutputStream * stream, gpointer user_data) { GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); GstAdaptiveDemuxTestExpectedOutput *testOutputStreamData; testOutputStreamData = gst_adaptive_demux_test_find_test_data_by_stream (testData, stream, NULL); fail_unless (testOutputStreamData != NULL); /* expect to receive more than 0 */ fail_unless (stream->total_received_size > 0, "size validation failed for %s, expected > 0, received %d", testOutputStreamData->name, stream->total_received_size); /* expect to receive less than file size */ fail_unless (stream->total_received_size < testOutputStreamData->expected_size, "size validation failed for %s, expected < %d received %d", testOutputStreamData->name, testOutputStreamData->expected_size, stream->total_received_size); if (testData->count_of_finished_streams == g_list_length (testData->output_streams)) { g_main_loop_quit (engine->loop); } } void gst_adaptive_demux_test_seek (const gchar * element_name, const gchar * manifest_uri, GstAdaptiveDemuxTestCase * testData) { GstAdaptiveDemuxTestCallbacks cb = { 0 }; cb.appsink_received_data = gst_adaptive_demux_test_check_received_data; cb.appsink_eos = gst_adaptive_demux_test_check_size_of_received_data; cb.appsink_event = testSeekAdaptiveAppSinkEvent; cb.pre_test = testSeekPreTestCallback; cb.post_test = testSeekPostTestCallback; cb.demux_sent_data = testSeekAdaptiveDemuxSendsData; gst_adaptive_demux_test_run (element_name, manifest_uri, &cb, testData); /* the call to g_object_unref of testData will clean up the seek task */ } void gst_adaptive_demux_test_setup (void) { GstRegistry *registry; gboolean ret; registry = gst_registry_get (); ret = gst_test_http_src_register_plugin (registry, GST_TEST_HTTP_SRC_NAME); fail_unless (ret); } void gst_adaptive_demux_test_teardown (void) { gst_test_http_src_install_callbacks (NULL, NULL); gst_test_http_src_set_default_blocksize (0); }