diff options
author | Thiago Santos <ts.santos@sisa.samsung.com> | 2014-04-25 08:50:18 -0300 |
---|---|---|
committer | Thiago Santos <ts.santos@sisa.samsung.com> | 2014-05-07 01:00:49 -0300 |
commit | bed3d66605608811042ba0704e96b513274366b3 (patch) | |
tree | e842858868d390bb5ee3f88049dd3dc1fd5b040f | |
parent | 6f4fd7086745720c39cc1d6bfd7a1c4c845caf99 (diff) | |
download | gstreamer-plugins-bad-bed3d66605608811042ba0704e96b513274366b3.tar.gz |
dashdemux: remove uridownloader from fragments download
Instead, use a source element linked to a ghostpad to provide
smaller buffers and more granular control for downstream
buffering elements while also reducing startup latency
-rw-r--r-- | ext/dash/gstdashdemux.c | 515 | ||||
-rw-r--r-- | ext/dash/gstdashdemux.h | 15 |
2 files changed, 374 insertions, 156 deletions
diff --git a/ext/dash/gstdashdemux.c b/ext/dash/gstdashdemux.c index 436b65042..58eed4978 100644 --- a/ext/dash/gstdashdemux.c +++ b/ext/dash/gstdashdemux.c @@ -218,6 +218,8 @@ static GstFlowReturn gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream static gboolean gst_dash_demux_advance_period (GstDashDemux * demux); static void gst_dash_demux_download_wait (GstDashDemuxStream * stream, GstClockTime time_diff); +static void gst_dash_demux_stream_download_uri (GstDashDemux * demux, + GstDashDemuxStream * stream, const gchar * uri, gint64 start, gint64 end); static void gst_dash_demux_expose_streams (GstDashDemux * demux); static void gst_dash_demux_remove_streams (GstDashDemux * demux, @@ -226,10 +228,11 @@ static void gst_dash_demux_stream_free (GstDashDemuxStream * stream); static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose); static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream); -static GstPad *gst_dash_demux_create_pad (GstDashDemux * demux); +static GstPad *gst_dash_demux_create_pad (GstDashDemux * demux, + GstDashDemuxStream * stream); #define gst_dash_demux_parent_class parent_class -G_DEFINE_TYPE_WITH_CODE (GstDashDemux, gst_dash_demux, GST_TYPE_ELEMENT, +G_DEFINE_TYPE_WITH_CODE (GstDashDemux, gst_dash_demux, GST_TYPE_BIN, GST_DEBUG_CATEGORY_INIT (gst_dash_demux_debug, "dashdemux", 0, "dashdemux element"); ); @@ -663,7 +666,7 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux) GST_LOG_OBJECT (demux, "Creating stream %d %" GST_PTR_FORMAT, i, caps); streams = g_slist_prepend (streams, stream); - stream->pad = gst_dash_demux_create_pad (demux); + stream->pad = gst_dash_demux_create_pad (demux, stream); stream_id = gst_pad_create_stream_id_printf (stream->pad, @@ -958,6 +961,7 @@ gst_dash_demux_wait_stop (GstDashDemux * demux) GstDashDemuxStream *stream = iter->data; gst_task_join (stream->download_task); + gst_element_set_state (stream->src, GST_STATE_NULL); } } @@ -979,22 +983,30 @@ gst_dash_demux_stop (GstDashDemux * demux) stream->need_header = TRUE; gst_task_stop (stream->download_task); GST_TASK_SIGNAL (stream->download_task); + gst_element_set_state (stream->src, GST_STATE_READY); + g_cond_signal (&stream->fragment_download_cond); gst_uri_downloader_cancel (stream->downloader); } } static GstPad * -gst_dash_demux_create_pad (GstDashDemux * demux) +gst_dash_demux_create_pad (GstDashDemux * demux, GstDashDemuxStream * stream) { GstPad *pad; + GstPadTemplate *tmpl; + + tmpl = gst_static_pad_template_get (&srctemplate); /* Create and activate new pads */ - pad = gst_pad_new_from_static_template (&srctemplate, NULL); + pad = gst_ghost_pad_new_no_target_from_template (NULL, tmpl); + gst_object_unref (tmpl); + gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_dash_demux_src_event)); gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_dash_demux_src_query)); - gst_pad_set_element_private (pad, demux); + gst_pad_set_element_private (pad, stream); + gst_pad_set_active (pad, TRUE); GST_INFO_OBJECT (demux, "Creating srcpad %s:%s", GST_DEBUG_PAD_NAME (pad)); return pad; @@ -1081,50 +1093,6 @@ gst_dash_demux_advance_period (GstDashDemux * demux) return TRUE; } -static GstFlowReturn -gst_dash_demux_push (GstDashDemuxStream * stream, GstBuffer * buffer) -{ - GstDashDemux *demux = stream->demux; - GstClockTime timestamp, duration; - GstFlowReturn ret = GST_FLOW_OK; - - timestamp = GST_BUFFER_TIMESTAMP (buffer); - - if (stream->pending_segment) { - if (demux->timestamp_offset == -1) - demux->timestamp_offset = timestamp; - else - demux->timestamp_offset = MIN (timestamp, demux->timestamp_offset); - - /* And send a newsegment */ - gst_pad_push_event (stream->pad, stream->pending_segment); - stream->pending_segment = NULL; - } - - /* make timestamp start from 0 by subtracting the offset */ - timestamp -= demux->timestamp_offset; - duration = GST_BUFFER_DURATION (buffer); - - GST_BUFFER_TIMESTAMP (buffer) = timestamp; - - GST_DEBUG_OBJECT (stream->pad, - "Pushing fragment %p #%" G_GUINT64_FORMAT " (stream %d) ts:%" - GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT, buffer, - GST_BUFFER_OFFSET (buffer), stream->index, - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), - GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); - ret = gst_pad_push (stream->pad, gst_buffer_ref (buffer)); - GST_DEBUG_OBJECT (stream->pad, "Push result: %d %s", ret, - gst_flow_get_name (ret)); - - demux->segment.position = timestamp; - stream->position = timestamp; - if (GST_CLOCK_TIME_IS_VALID (duration)) - stream->position += duration; - - return ret; -} - static void gst_dash_demux_stream_free (GstDashDemuxStream * stream) { @@ -1148,6 +1116,20 @@ gst_dash_demux_stream_free (GstDashDemuxStream * stream) g_cond_clear (&stream->download_cond); g_mutex_clear (&stream->download_mutex); + if (stream->src_srcpad) { + gst_object_unref (stream->src_srcpad); + stream->src_srcpad = NULL; + } + + if (stream->src) { + gst_element_set_state (stream->src, GST_STATE_NULL); + gst_bin_remove (GST_BIN_CAST (stream->demux), stream->src); + stream->src = NULL; + } + + g_cond_clear (&stream->fragment_download_cond); + g_mutex_clear (&stream->fragment_download_lock); + if (stream->downloader != NULL) { g_object_unref (stream->downloader); } @@ -1655,14 +1637,12 @@ gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream * return NULL; } -static GstBuffer * +static void gst_dash_demux_download_header_fragment (GstDashDemux * demux, GstDashDemuxStream * stream, gchar * path, gint64 range_start, gint64 range_end) { - GstBuffer *buffer = NULL; gchar *next_header_uri; - GstFragment *fragment; if (strncmp (path, "http://", 7) != 0) { next_header_uri = @@ -1673,56 +1653,37 @@ gst_dash_demux_download_header_fragment (GstDashDemux * demux, next_header_uri = path; } - fragment = gst_uri_downloader_fetch_uri_with_range (stream->downloader, - next_header_uri, demux->client->mpd_uri, FALSE, FALSE, TRUE, range_start, - range_end, NULL); + gst_dash_demux_stream_download_uri (demux, stream, next_header_uri, + range_start, range_end); g_free (next_header_uri); - if (fragment) { - buffer = gst_fragment_get_buffer (fragment); - g_object_unref (fragment); - } - return buffer; } -static GstBuffer * +static void gst_dash_demux_get_next_header (GstDashDemux * demux, GstDashDemuxStream * stream) { gchar *initializationURL; - GstBuffer *header_buffer, *index_buffer = NULL; gint64 range_start, range_end; if (!gst_mpd_client_get_next_header (demux->client, &initializationURL, stream->index, &range_start, &range_end)) - return NULL; + return; GST_INFO_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT, initializationURL, range_start, range_end); - header_buffer = gst_dash_demux_download_header_fragment (demux, stream, + gst_dash_demux_download_header_fragment (demux, stream, initializationURL, range_start, range_end); /* check if we have an index */ - if (header_buffer + if (!demux->cancelled && stream->last_ret == GST_FLOW_OK /* TODO check for other valid types */ && gst_mpd_client_get_next_header_index (demux->client, &initializationURL, stream->index, &range_start, &range_end)) { GST_INFO_OBJECT (demux, "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT, initializationURL, range_start, range_end); - index_buffer = - gst_dash_demux_download_header_fragment (demux, stream, + gst_dash_demux_download_header_fragment (demux, stream, initializationURL, range_start, range_end); } - - if (header_buffer == NULL) { - GST_WARNING_OBJECT (demux, "Unable to fetch header"); - return NULL; - } - - if (index_buffer) { - header_buffer = gst_buffer_append (header_buffer, index_buffer); - } - - return header_buffer; } static GstCaps * @@ -1847,19 +1808,283 @@ gst_dash_demux_wait_for_fragment_to_be_available (GstDashDemux * demux, } } -static GstBuffer * +static GstFlowReturn +_src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +{ + GstPad *srcpad = (GstPad *) parent; + GstDashDemux *demux = (GstDashDemux *) GST_PAD_PARENT (srcpad); + GstDashDemuxStream *stream = gst_pad_get_element_private (srcpad); + GstFlowReturn ret; + gboolean discont = FALSE; + + if (stream->starting_fragment) { + if (demux->segment.rate < 0) + /* Set DISCONT flag for every first buffer in reverse playback mode + * as each fragment for its own has to be reversed */ + discont = TRUE; + stream->starting_fragment = FALSE; + + GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT, + GST_TIME_ARGS (stream->current_fragment.timestamp)); + + GST_BUFFER_PTS (buffer) = stream->current_fragment.timestamp; + } else { + GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE; + } + + if (discont) { + GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous"); + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + } else { + GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT); + } + + + GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE; + GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE; + GST_BUFFER_OFFSET (buffer) = + gst_mpd_client_get_segment_index (stream->active_stream) - 1; + + if (stream->pending_segment) { + if (demux->timestamp_offset == -1) + demux->timestamp_offset = GST_BUFFER_PTS (buffer); + else + demux->timestamp_offset = + MIN (GST_BUFFER_PTS (buffer), demux->timestamp_offset); + + /* And send a newsegment */ + gst_pad_push_event (stream->pad, stream->pending_segment); + stream->pending_segment = NULL; + } + + /* make timestamp start from 0 by subtracting the offset */ + GST_BUFFER_PTS (buffer) -= demux->timestamp_offset; + + stream->position = demux->segment.position = GST_BUFFER_PTS (buffer); + + /* accumulate time and size to get this chunk */ + stream->download_total_time += + g_get_monotonic_time () - stream->download_start_time; + stream->download_total_bytes += gst_buffer_get_size (buffer); + + ret = gst_proxy_pad_chain_default (pad, parent, buffer); + stream->download_start_time = g_get_monotonic_time (); + GST_LOG_OBJECT (pad, "Chain res: %d %s", ret, gst_flow_get_name (ret)); + + if (ret != GST_FLOW_OK) { + if (ret < GST_FLOW_EOS) { + GST_ELEMENT_ERROR (demux, STREAM, FAILED, (NULL), + ("stream stopped, reason %s", gst_flow_get_name (ret))); + + /* TODO push this on all pads */ + gst_pad_push_event (stream->pad, gst_event_new_eos ()); + } else { + GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s", + gst_flow_get_name (ret)); + } + + /* TODO properly stop tasks */ + /* gst_hls_demux_pause_tasks (demux); */ + } + + /* avoid having the source handle the same error again */ + stream->last_ret = ret; + ret = GST_FLOW_OK; + + return ret; +} + +static gboolean +_src_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstPad *srcpad = GST_PAD_CAST (parent); + GstDashDemuxStream *stream = gst_pad_get_element_private (srcpad); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + g_cond_signal (&stream->fragment_download_cond); + break; + default: + break; + } + + gst_event_unref (event); + + return TRUE; +} + +static gboolean +_src_query (GstPad * pad, GstObject * parent, GstQuery * query) +{ + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_ALLOCATION: + return FALSE; + break; + default: + break; + } + + return gst_pad_query_default (pad, parent, query); +} + +static gboolean +gst_dash_demux_stream_update_source (GstDashDemuxStream * stream, + const gchar * uri, const gchar * referer, gboolean refresh, + gboolean allow_cache) +{ + GstDashDemux *demux = stream->demux; + + if (!gst_uri_is_valid (uri)) + return FALSE; + + if (stream->src != NULL) { + gchar *old_protocol, *new_protocol; + gchar *old_uri; + + old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->src)); + old_protocol = gst_uri_get_protocol (old_uri); + new_protocol = gst_uri_get_protocol (uri); + + if (!g_str_equal (old_protocol, new_protocol)) { + gst_object_unref (stream->src_srcpad); + gst_element_set_state (stream->src, GST_STATE_NULL); + gst_bin_remove (GST_BIN_CAST (demux), stream->src); + stream->src = NULL; + stream->src_srcpad = NULL; + GST_DEBUG_OBJECT (demux, "Can't re-use old source element"); + } else { + GError *err = NULL; + + GST_DEBUG_OBJECT (demux, "Re-using old source element"); + if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->src), uri, &err)) { + GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s", + err->message); + g_clear_error (&err); + gst_element_set_state (stream->src, GST_STATE_NULL); + gst_bin_remove (GST_BIN_CAST (demux), stream->src); + stream->src = NULL; + } + } + g_free (old_uri); + g_free (old_protocol); + g_free (new_protocol); + } + + if (stream->src == NULL) { + GObjectClass *gobject_class; + GstPad *internal_pad; + + stream->src = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL); + if (stream->src == NULL) { + GST_WARNING_OBJECT (demux, "No element to handle uri: %s", uri); + return FALSE; + } + + gobject_class = G_OBJECT_GET_CLASS (stream->src); + + if (g_object_class_find_property (gobject_class, "compress")) + g_object_set (stream->src, "compress", FALSE, NULL); + if (g_object_class_find_property (gobject_class, "keep-alive")) + g_object_set (stream->src, "keep-alive", TRUE, NULL); + if (g_object_class_find_property (gobject_class, "extra-headers")) { + if (referer || refresh || !allow_cache) { + GstStructure *extra_headers = gst_structure_new_empty ("headers"); + + if (referer) + gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer, + NULL); + + if (!allow_cache) + gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING, + "no-cache", NULL); + else if (refresh) + gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING, + "max-age=0", NULL); + + g_object_set (stream->src, "extra-headers", extra_headers, NULL); + + gst_structure_free (extra_headers); + } else { + g_object_set (stream->src, "extra-headers", NULL, NULL); + } + } + + gst_element_set_locked_state (stream->src, TRUE); + gst_bin_add (GST_BIN_CAST (demux), stream->src); + stream->src_srcpad = gst_element_get_static_pad (stream->src, "src"); + + gst_ghost_pad_set_target (GST_GHOST_PAD_CAST (stream->pad), + stream->src_srcpad); + + /* set up our internal pad to drop all events from + * the http src we don't care about. On the chain function + * we just push the buffer forward, but this way dash can get + * the flow return from downstream */ + internal_pad = + GST_PAD_CAST (gst_proxy_pad_get_internal (GST_PROXY_PAD (stream->pad))); + gst_pad_set_chain_function (GST_PAD_CAST (internal_pad), _src_chain); + gst_pad_set_event_function (GST_PAD_CAST (internal_pad), _src_event); + /* need to set query otherwise deadlocks happen with allocation queries */ + gst_pad_set_query_function (GST_PAD_CAST (internal_pad), _src_query); + gst_object_unref (internal_pad); + } + return TRUE; +} + +/* must be called with the stream's fragment_download_lock */ +static void +gst_dash_demux_stream_download_uri (GstDashDemux * demux, + GstDashDemuxStream * stream, const gchar * uri, gint64 start, gint64 end) +{ + GST_DEBUG_OBJECT (stream->pad, "Downloading uri: %s, range:%" G_GINT64_FORMAT + " - %" G_GINT64_FORMAT, uri, start, end); + + /* TODO check return */ + gst_dash_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE); + if (gst_element_set_state (stream->src, + GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) { + if (start != 0 || end != -1) { + if (!gst_element_send_event (stream->src, gst_event_new_seek (1.0, + GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH, + GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) { + + /* looks like the source can't handle seeks in READY */ +/* + *err = g_error_new (GST_CORE_ERROR, GST_CORE_ERROR_NOT_IMPLEMENTED, + "Source element can't handle range requests"); +*/ + stream->last_ret = GST_FLOW_ERROR; + } + } + + if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) { + /* flush the proxypads so that the EOS state is reset */ + gst_pad_push_event (stream->src_srcpad, gst_event_new_flush_start ()); + gst_pad_push_event (stream->src_srcpad, gst_event_new_flush_stop (TRUE)); + + stream->download_start_time = g_get_monotonic_time (); + gst_element_sync_state_with_parent (stream->src); + + /* wait for the fragment to be completely downloaded */ + GST_DEBUG_OBJECT (stream->pad, + "Waiting for fragment download to finish: %s", uri); + g_cond_wait (&stream->fragment_download_cond, + &stream->fragment_download_lock); + } + } else { + stream->last_ret = GST_FLOW_CUSTOM_ERROR; + } + + gst_element_set_state (stream->src, GST_STATE_READY); +} + +static void gst_dash_demux_stream_download_fragment (GstDashDemux * demux, - GstDashDemuxStream * stream, guint64 * size_buffer, - GstClockTime * download_time) + GstDashDemuxStream * stream) { GstActiveStream *active_stream; - GstFragment *download; - GTimeVal now; - GTimeVal start; guint stream_idx = stream->index; - GstBuffer *buffer = NULL; - GstBuffer *header_buffer; - GstMediaFragmentInfo fragment; + GstMediaFragmentInfo *fragment = &stream->current_fragment; if (G_UNLIKELY (stream->restart_download)) { GstClockTime cur, ts; @@ -1914,84 +2139,72 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux, stream->restart_download = FALSE; } - if (gst_mpd_client_get_next_fragment (demux->client, stream_idx, &fragment)) { - g_get_current_time (&start); + g_mutex_lock (&stream->fragment_download_lock); + if (gst_mpd_client_get_next_fragment (demux->client, stream_idx, fragment)) { GST_INFO_OBJECT (stream->pad, "Fetching next fragment %s ts:%" GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT " Range:%" G_GINT64_FORMAT "-%" G_GINT64_FORMAT, - fragment.uri, GST_TIME_ARGS (fragment.timestamp), - GST_TIME_ARGS (fragment.duration), - fragment.range_start, fragment.range_end); - - download = gst_uri_downloader_fetch_uri_with_range (stream->downloader, - fragment.uri, demux->client->mpd_uri, FALSE, FALSE, TRUE, - fragment.range_start, fragment.range_end, NULL); - - if (download == NULL) { - gst_media_fragment_info_clear (&fragment); - return NULL; - } + fragment->uri, GST_TIME_ARGS (fragment->timestamp), + GST_TIME_ARGS (fragment->duration), + fragment->range_start, fragment->range_end); - active_stream = stream->active_stream; - if (active_stream == NULL) { - gst_media_fragment_info_clear (&fragment); - g_object_unref (download); - return NULL; + if (stream->need_header) { + /* We need to fetch a new header */ + gst_dash_demux_get_next_header (demux, stream); + stream->need_header = FALSE; } - buffer = gst_fragment_get_buffer (download); - g_object_unref (download); - /* it is possible to have an index per fragment, so check and download */ - if (fragment.index_uri || fragment.index_range_start - || fragment.index_range_end != -1) { - const gchar *uri = fragment.index_uri; - GstBuffer *index_buffer; + if (fragment->index_uri || fragment->index_range_start + || fragment->index_range_end != -1) { + const gchar *uri = fragment->index_uri; if (!uri) /* fallback to default media uri */ - uri = fragment.uri; + uri = fragment->uri; GST_DEBUG_OBJECT (stream->pad, "Fragment index download: %s %" G_GINT64_FORMAT "-%" - G_GINT64_FORMAT, uri, fragment.index_range_start, - fragment.index_range_end); - download = - gst_uri_downloader_fetch_uri_with_range (stream->downloader, uri, - demux->client->mpd_uri, FALSE, FALSE, TRUE, - fragment.index_range_start, fragment.index_range_end, NULL); - if (download) { - index_buffer = gst_fragment_get_buffer (download); - if (index_buffer) - buffer = gst_buffer_append (index_buffer, buffer); - g_object_unref (download); - } + G_GINT64_FORMAT, uri, fragment->index_range_start, + fragment->index_range_end); + gst_dash_demux_stream_download_uri (demux, stream, uri, + fragment->index_range_start, fragment->index_range_end); } - if (stream->need_header) { - /* We need to fetch a new header */ - if ((header_buffer = - gst_dash_demux_get_next_header (demux, stream)) != NULL) { - buffer = gst_buffer_append (header_buffer, buffer); - } - stream->need_header = FALSE; + if (demux->cancelled) + goto exit; + + /* now get the real fragment */ + gst_dash_demux_stream_download_uri (demux, stream, fragment->uri, + fragment->range_start, fragment->range_end); + + if (stream->last_ret < GST_FLOW_EOS) { + gst_media_fragment_info_clear (fragment); + goto exit; } - g_get_current_time (&now); - *download_time = (GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (start)); - buffer = gst_buffer_make_writable (buffer); + active_stream = stream->active_stream; + if (active_stream == NULL) { + gst_media_fragment_info_clear (fragment); + stream->last_ret = GST_FLOW_ERROR; + goto exit; + } - GST_BUFFER_TIMESTAMP (buffer) = fragment.timestamp; - GST_BUFFER_DURATION (buffer) = fragment.duration; - GST_BUFFER_OFFSET (buffer) = - gst_mpd_client_get_segment_index (active_stream) - 1; + if (stream->last_ret == GST_FLOW_OK) { + stream->position += fragment->duration; + } - gst_media_fragment_info_clear (&fragment); - *size_buffer += gst_buffer_get_size (buffer); + gst_media_fragment_info_clear (fragment); } else { GST_WARNING_OBJECT (demux, "Failed to download fragment for stream %p %d", stream, stream->index); } - return buffer; + + +exit: + g_mutex_unlock (&stream->fragment_download_lock); + + /* TODO a race can set it to READY after it was set to NULL */ + gst_element_set_state (stream->src, GST_STATE_READY); } /* gst_dash_demux_stream_get_next_fragment: @@ -2008,9 +2221,6 @@ static GstFlowReturn gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream * stream, GstClockTime * ts) { - guint64 buffer_size = 0; - GstClockTime diff; - GstBuffer *buffer = NULL; GstFlowReturn ret = GST_FLOW_OK; GstDashDemux *demux = stream->demux; @@ -2048,19 +2258,15 @@ gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream * stream, } /* Get the fragment corresponding to each stream index */ - buffer = - gst_dash_demux_stream_download_fragment (demux, stream, - &buffer_size, &diff); + gst_dash_demux_stream_download_fragment (demux, stream); demux->end_of_period = FALSE; - if (buffer) { - ret = gst_dash_demux_push (stream, buffer); - } else { + if (stream->last_ret < GST_FLOW_EOS) { GST_WARNING_OBJECT (stream->pad, "Failed to download fragment"); return GST_FLOW_ERROR; } - +#if 0 if (buffer_size > 0 && diff > 0) { #ifndef GST_DISABLE_GST_DEBUG guint64 brate; @@ -2076,6 +2282,7 @@ gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream * stream, G_GUINT64_FORMAT " Ko in %.2f s)", stream->index, brate / 1000, buffer_size / 1024, ((double) diff / GST_SECOND)); } +#endif return ret; } diff --git a/ext/dash/gstdashdemux.h b/ext/dash/gstdashdemux.h index 935e7e9b1..6d6eeedf1 100644 --- a/ext/dash/gstdashdemux.h +++ b/ext/dash/gstdashdemux.h @@ -81,6 +81,17 @@ struct _GstDashDemuxStream GstUriDownloader *downloader; GstDownloadRate dnl_rate; + + /* download tooling */ + GstElement *src; + GstPad *src_srcpad; + GMutex fragment_download_lock; + GCond fragment_download_cond; + GstMediaFragmentInfo current_fragment; + gboolean starting_fragment; + gint64 download_start_time; + gint64 download_total_time; + gint64 download_total_bytes; }; /** @@ -90,7 +101,7 @@ struct _GstDashDemuxStream */ struct _GstDashDemux { - GstElement parent; + GstBin parent; GstPad *sinkpad; gboolean have_group_id; @@ -123,7 +134,7 @@ struct _GstDashDemux struct _GstDashDemuxClass { - GstElementClass parent_class; + GstBinClass parent_class; }; GType gst_dash_demux_get_type (void); |