summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThiago Santos <ts.santos@sisa.samsung.com>2014-04-25 08:50:18 -0300
committerThiago Santos <ts.santos@sisa.samsung.com>2014-05-07 01:00:49 -0300
commitbed3d66605608811042ba0704e96b513274366b3 (patch)
treee842858868d390bb5ee3f88049dd3dc1fd5b040f
parent6f4fd7086745720c39cc1d6bfd7a1c4c845caf99 (diff)
downloadgstreamer-plugins-bad-bed3d66605608811042ba0704e96b513274366b3.tar.gz
dashdemux: remove uridownloader from fragments download
Instead, use a source element linked to a ghostpad to provide smaller buffers and more granular control for downstream buffering elements while also reducing startup latency
-rw-r--r--ext/dash/gstdashdemux.c515
-rw-r--r--ext/dash/gstdashdemux.h15
2 files changed, 374 insertions, 156 deletions
diff --git a/ext/dash/gstdashdemux.c b/ext/dash/gstdashdemux.c
index 436b65042..58eed4978 100644
--- a/ext/dash/gstdashdemux.c
+++ b/ext/dash/gstdashdemux.c
@@ -218,6 +218,8 @@ static GstFlowReturn gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream
static gboolean gst_dash_demux_advance_period (GstDashDemux * demux);
static void gst_dash_demux_download_wait (GstDashDemuxStream * stream,
GstClockTime time_diff);
+static void gst_dash_demux_stream_download_uri (GstDashDemux * demux,
+ GstDashDemuxStream * stream, const gchar * uri, gint64 start, gint64 end);
static void gst_dash_demux_expose_streams (GstDashDemux * demux);
static void gst_dash_demux_remove_streams (GstDashDemux * demux,
@@ -226,10 +228,11 @@ static void gst_dash_demux_stream_free (GstDashDemuxStream * stream);
static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose);
static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux,
GstActiveStream * stream);
-static GstPad *gst_dash_demux_create_pad (GstDashDemux * demux);
+static GstPad *gst_dash_demux_create_pad (GstDashDemux * demux,
+ GstDashDemuxStream * stream);
#define gst_dash_demux_parent_class parent_class
-G_DEFINE_TYPE_WITH_CODE (GstDashDemux, gst_dash_demux, GST_TYPE_ELEMENT,
+G_DEFINE_TYPE_WITH_CODE (GstDashDemux, gst_dash_demux, GST_TYPE_BIN,
GST_DEBUG_CATEGORY_INIT (gst_dash_demux_debug, "dashdemux", 0,
"dashdemux element");
);
@@ -663,7 +666,7 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
GST_LOG_OBJECT (demux, "Creating stream %d %" GST_PTR_FORMAT, i, caps);
streams = g_slist_prepend (streams, stream);
- stream->pad = gst_dash_demux_create_pad (demux);
+ stream->pad = gst_dash_demux_create_pad (demux, stream);
stream_id =
gst_pad_create_stream_id_printf (stream->pad,
@@ -958,6 +961,7 @@ gst_dash_demux_wait_stop (GstDashDemux * demux)
GstDashDemuxStream *stream = iter->data;
gst_task_join (stream->download_task);
+ gst_element_set_state (stream->src, GST_STATE_NULL);
}
}
@@ -979,22 +983,30 @@ gst_dash_demux_stop (GstDashDemux * demux)
stream->need_header = TRUE;
gst_task_stop (stream->download_task);
GST_TASK_SIGNAL (stream->download_task);
+ gst_element_set_state (stream->src, GST_STATE_READY);
+ g_cond_signal (&stream->fragment_download_cond);
gst_uri_downloader_cancel (stream->downloader);
}
}
static GstPad *
-gst_dash_demux_create_pad (GstDashDemux * demux)
+gst_dash_demux_create_pad (GstDashDemux * demux, GstDashDemuxStream * stream)
{
GstPad *pad;
+ GstPadTemplate *tmpl;
+
+ tmpl = gst_static_pad_template_get (&srctemplate);
/* Create and activate new pads */
- pad = gst_pad_new_from_static_template (&srctemplate, NULL);
+ pad = gst_ghost_pad_new_no_target_from_template (NULL, tmpl);
+ gst_object_unref (tmpl);
+
gst_pad_set_event_function (pad,
GST_DEBUG_FUNCPTR (gst_dash_demux_src_event));
gst_pad_set_query_function (pad,
GST_DEBUG_FUNCPTR (gst_dash_demux_src_query));
- gst_pad_set_element_private (pad, demux);
+ gst_pad_set_element_private (pad, stream);
+
gst_pad_set_active (pad, TRUE);
GST_INFO_OBJECT (demux, "Creating srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
return pad;
@@ -1081,50 +1093,6 @@ gst_dash_demux_advance_period (GstDashDemux * demux)
return TRUE;
}
-static GstFlowReturn
-gst_dash_demux_push (GstDashDemuxStream * stream, GstBuffer * buffer)
-{
- GstDashDemux *demux = stream->demux;
- GstClockTime timestamp, duration;
- GstFlowReturn ret = GST_FLOW_OK;
-
- timestamp = GST_BUFFER_TIMESTAMP (buffer);
-
- if (stream->pending_segment) {
- if (demux->timestamp_offset == -1)
- demux->timestamp_offset = timestamp;
- else
- demux->timestamp_offset = MIN (timestamp, demux->timestamp_offset);
-
- /* And send a newsegment */
- gst_pad_push_event (stream->pad, stream->pending_segment);
- stream->pending_segment = NULL;
- }
-
- /* make timestamp start from 0 by subtracting the offset */
- timestamp -= demux->timestamp_offset;
- duration = GST_BUFFER_DURATION (buffer);
-
- GST_BUFFER_TIMESTAMP (buffer) = timestamp;
-
- GST_DEBUG_OBJECT (stream->pad,
- "Pushing fragment %p #%" G_GUINT64_FORMAT " (stream %d) ts:%"
- GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT, buffer,
- GST_BUFFER_OFFSET (buffer), stream->index,
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
- GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
- ret = gst_pad_push (stream->pad, gst_buffer_ref (buffer));
- GST_DEBUG_OBJECT (stream->pad, "Push result: %d %s", ret,
- gst_flow_get_name (ret));
-
- demux->segment.position = timestamp;
- stream->position = timestamp;
- if (GST_CLOCK_TIME_IS_VALID (duration))
- stream->position += duration;
-
- return ret;
-}
-
static void
gst_dash_demux_stream_free (GstDashDemuxStream * stream)
{
@@ -1148,6 +1116,20 @@ gst_dash_demux_stream_free (GstDashDemuxStream * stream)
g_cond_clear (&stream->download_cond);
g_mutex_clear (&stream->download_mutex);
+ if (stream->src_srcpad) {
+ gst_object_unref (stream->src_srcpad);
+ stream->src_srcpad = NULL;
+ }
+
+ if (stream->src) {
+ gst_element_set_state (stream->src, GST_STATE_NULL);
+ gst_bin_remove (GST_BIN_CAST (stream->demux), stream->src);
+ stream->src = NULL;
+ }
+
+ g_cond_clear (&stream->fragment_download_cond);
+ g_mutex_clear (&stream->fragment_download_lock);
+
if (stream->downloader != NULL) {
g_object_unref (stream->downloader);
}
@@ -1655,14 +1637,12 @@ gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream *
return NULL;
}
-static GstBuffer *
+static void
gst_dash_demux_download_header_fragment (GstDashDemux * demux,
GstDashDemuxStream * stream, gchar * path, gint64 range_start,
gint64 range_end)
{
- GstBuffer *buffer = NULL;
gchar *next_header_uri;
- GstFragment *fragment;
if (strncmp (path, "http://", 7) != 0) {
next_header_uri =
@@ -1673,56 +1653,37 @@ gst_dash_demux_download_header_fragment (GstDashDemux * demux,
next_header_uri = path;
}
- fragment = gst_uri_downloader_fetch_uri_with_range (stream->downloader,
- next_header_uri, demux->client->mpd_uri, FALSE, FALSE, TRUE, range_start,
- range_end, NULL);
+ gst_dash_demux_stream_download_uri (demux, stream, next_header_uri,
+ range_start, range_end);
g_free (next_header_uri);
- if (fragment) {
- buffer = gst_fragment_get_buffer (fragment);
- g_object_unref (fragment);
- }
- return buffer;
}
-static GstBuffer *
+static void
gst_dash_demux_get_next_header (GstDashDemux * demux,
GstDashDemuxStream * stream)
{
gchar *initializationURL;
- GstBuffer *header_buffer, *index_buffer = NULL;
gint64 range_start, range_end;
if (!gst_mpd_client_get_next_header (demux->client, &initializationURL,
stream->index, &range_start, &range_end))
- return NULL;
+ return;
GST_INFO_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
G_GINT64_FORMAT, initializationURL, range_start, range_end);
- header_buffer = gst_dash_demux_download_header_fragment (demux, stream,
+ gst_dash_demux_download_header_fragment (demux, stream,
initializationURL, range_start, range_end);
/* check if we have an index */
- if (header_buffer
+ if (!demux->cancelled && stream->last_ret == GST_FLOW_OK /* TODO check for other valid types */
&& gst_mpd_client_get_next_header_index (demux->client,
&initializationURL, stream->index, &range_start, &range_end)) {
GST_INFO_OBJECT (demux,
"Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
initializationURL, range_start, range_end);
- index_buffer =
- gst_dash_demux_download_header_fragment (demux, stream,
+ gst_dash_demux_download_header_fragment (demux, stream,
initializationURL, range_start, range_end);
}
-
- if (header_buffer == NULL) {
- GST_WARNING_OBJECT (demux, "Unable to fetch header");
- return NULL;
- }
-
- if (index_buffer) {
- header_buffer = gst_buffer_append (header_buffer, index_buffer);
- }
-
- return header_buffer;
}
static GstCaps *
@@ -1847,19 +1808,283 @@ gst_dash_demux_wait_for_fragment_to_be_available (GstDashDemux * demux,
}
}
-static GstBuffer *
+static GstFlowReturn
+_src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+ GstPad *srcpad = (GstPad *) parent;
+ GstDashDemux *demux = (GstDashDemux *) GST_PAD_PARENT (srcpad);
+ GstDashDemuxStream *stream = gst_pad_get_element_private (srcpad);
+ GstFlowReturn ret;
+ gboolean discont = FALSE;
+
+ if (stream->starting_fragment) {
+ if (demux->segment.rate < 0)
+ /* Set DISCONT flag for every first buffer in reverse playback mode
+ * as each fragment for its own has to be reversed */
+ discont = TRUE;
+ stream->starting_fragment = FALSE;
+
+ GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
+ GST_TIME_ARGS (stream->current_fragment.timestamp));
+
+ GST_BUFFER_PTS (buffer) = stream->current_fragment.timestamp;
+ } else {
+ GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
+ }
+
+ if (discont) {
+ GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+ } else {
+ GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
+ }
+
+
+ GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
+ GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
+ GST_BUFFER_OFFSET (buffer) =
+ gst_mpd_client_get_segment_index (stream->active_stream) - 1;
+
+ if (stream->pending_segment) {
+ if (demux->timestamp_offset == -1)
+ demux->timestamp_offset = GST_BUFFER_PTS (buffer);
+ else
+ demux->timestamp_offset =
+ MIN (GST_BUFFER_PTS (buffer), demux->timestamp_offset);
+
+ /* And send a newsegment */
+ gst_pad_push_event (stream->pad, stream->pending_segment);
+ stream->pending_segment = NULL;
+ }
+
+ /* make timestamp start from 0 by subtracting the offset */
+ GST_BUFFER_PTS (buffer) -= demux->timestamp_offset;
+
+ stream->position = demux->segment.position = GST_BUFFER_PTS (buffer);
+
+ /* accumulate time and size to get this chunk */
+ stream->download_total_time +=
+ g_get_monotonic_time () - stream->download_start_time;
+ stream->download_total_bytes += gst_buffer_get_size (buffer);
+
+ ret = gst_proxy_pad_chain_default (pad, parent, buffer);
+ stream->download_start_time = g_get_monotonic_time ();
+ GST_LOG_OBJECT (pad, "Chain res: %d %s", ret, gst_flow_get_name (ret));
+
+ if (ret != GST_FLOW_OK) {
+ if (ret < GST_FLOW_EOS) {
+ GST_ELEMENT_ERROR (demux, STREAM, FAILED, (NULL),
+ ("stream stopped, reason %s", gst_flow_get_name (ret)));
+
+ /* TODO push this on all pads */
+ gst_pad_push_event (stream->pad, gst_event_new_eos ());
+ } else {
+ GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
+ gst_flow_get_name (ret));
+ }
+
+ /* TODO properly stop tasks */
+ /* gst_hls_demux_pause_tasks (demux); */
+ }
+
+ /* avoid having the source handle the same error again */
+ stream->last_ret = ret;
+ ret = GST_FLOW_OK;
+
+ return ret;
+}
+
+static gboolean
+_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ GstPad *srcpad = GST_PAD_CAST (parent);
+ GstDashDemuxStream *stream = gst_pad_get_element_private (srcpad);
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_EOS:
+ g_cond_signal (&stream->fragment_download_cond);
+ break;
+ default:
+ break;
+ }
+
+ gst_event_unref (event);
+
+ return TRUE;
+}
+
+static gboolean
+_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
+{
+ switch (GST_QUERY_TYPE (query)) {
+ case GST_QUERY_ALLOCATION:
+ return FALSE;
+ break;
+ default:
+ break;
+ }
+
+ return gst_pad_query_default (pad, parent, query);
+}
+
+static gboolean
+gst_dash_demux_stream_update_source (GstDashDemuxStream * stream,
+ const gchar * uri, const gchar * referer, gboolean refresh,
+ gboolean allow_cache)
+{
+ GstDashDemux *demux = stream->demux;
+
+ if (!gst_uri_is_valid (uri))
+ return FALSE;
+
+ if (stream->src != NULL) {
+ gchar *old_protocol, *new_protocol;
+ gchar *old_uri;
+
+ old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->src));
+ old_protocol = gst_uri_get_protocol (old_uri);
+ new_protocol = gst_uri_get_protocol (uri);
+
+ if (!g_str_equal (old_protocol, new_protocol)) {
+ gst_object_unref (stream->src_srcpad);
+ gst_element_set_state (stream->src, GST_STATE_NULL);
+ gst_bin_remove (GST_BIN_CAST (demux), stream->src);
+ stream->src = NULL;
+ stream->src_srcpad = NULL;
+ GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
+ } else {
+ GError *err = NULL;
+
+ GST_DEBUG_OBJECT (demux, "Re-using old source element");
+ if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->src), uri, &err)) {
+ GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
+ err->message);
+ g_clear_error (&err);
+ gst_element_set_state (stream->src, GST_STATE_NULL);
+ gst_bin_remove (GST_BIN_CAST (demux), stream->src);
+ stream->src = NULL;
+ }
+ }
+ g_free (old_uri);
+ g_free (old_protocol);
+ g_free (new_protocol);
+ }
+
+ if (stream->src == NULL) {
+ GObjectClass *gobject_class;
+ GstPad *internal_pad;
+
+ stream->src = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
+ if (stream->src == NULL) {
+ GST_WARNING_OBJECT (demux, "No element to handle uri: %s", uri);
+ return FALSE;
+ }
+
+ gobject_class = G_OBJECT_GET_CLASS (stream->src);
+
+ if (g_object_class_find_property (gobject_class, "compress"))
+ g_object_set (stream->src, "compress", FALSE, NULL);
+ if (g_object_class_find_property (gobject_class, "keep-alive"))
+ g_object_set (stream->src, "keep-alive", TRUE, NULL);
+ if (g_object_class_find_property (gobject_class, "extra-headers")) {
+ if (referer || refresh || !allow_cache) {
+ GstStructure *extra_headers = gst_structure_new_empty ("headers");
+
+ if (referer)
+ gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
+ NULL);
+
+ if (!allow_cache)
+ gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
+ "no-cache", NULL);
+ else if (refresh)
+ gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
+ "max-age=0", NULL);
+
+ g_object_set (stream->src, "extra-headers", extra_headers, NULL);
+
+ gst_structure_free (extra_headers);
+ } else {
+ g_object_set (stream->src, "extra-headers", NULL, NULL);
+ }
+ }
+
+ gst_element_set_locked_state (stream->src, TRUE);
+ gst_bin_add (GST_BIN_CAST (demux), stream->src);
+ stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
+
+ gst_ghost_pad_set_target (GST_GHOST_PAD_CAST (stream->pad),
+ stream->src_srcpad);
+
+ /* set up our internal pad to drop all events from
+ * the http src we don't care about. On the chain function
+ * we just push the buffer forward, but this way dash can get
+ * the flow return from downstream */
+ internal_pad =
+ GST_PAD_CAST (gst_proxy_pad_get_internal (GST_PROXY_PAD (stream->pad)));
+ gst_pad_set_chain_function (GST_PAD_CAST (internal_pad), _src_chain);
+ gst_pad_set_event_function (GST_PAD_CAST (internal_pad), _src_event);
+ /* need to set query otherwise deadlocks happen with allocation queries */
+ gst_pad_set_query_function (GST_PAD_CAST (internal_pad), _src_query);
+ gst_object_unref (internal_pad);
+ }
+ return TRUE;
+}
+
+/* must be called with the stream's fragment_download_lock */
+static void
+gst_dash_demux_stream_download_uri (GstDashDemux * demux,
+ GstDashDemuxStream * stream, const gchar * uri, gint64 start, gint64 end)
+{
+ GST_DEBUG_OBJECT (stream->pad, "Downloading uri: %s, range:%" G_GINT64_FORMAT
+ " - %" G_GINT64_FORMAT, uri, start, end);
+
+ /* TODO check return */
+ gst_dash_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE);
+ if (gst_element_set_state (stream->src,
+ GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
+ if (start != 0 || end != -1) {
+ if (!gst_element_send_event (stream->src, gst_event_new_seek (1.0,
+ GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
+ GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
+
+ /* looks like the source can't handle seeks in READY */
+/*
+ *err = g_error_new (GST_CORE_ERROR, GST_CORE_ERROR_NOT_IMPLEMENTED,
+ "Source element can't handle range requests");
+*/
+ stream->last_ret = GST_FLOW_ERROR;
+ }
+ }
+
+ if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
+ /* flush the proxypads so that the EOS state is reset */
+ gst_pad_push_event (stream->src_srcpad, gst_event_new_flush_start ());
+ gst_pad_push_event (stream->src_srcpad, gst_event_new_flush_stop (TRUE));
+
+ stream->download_start_time = g_get_monotonic_time ();
+ gst_element_sync_state_with_parent (stream->src);
+
+ /* wait for the fragment to be completely downloaded */
+ GST_DEBUG_OBJECT (stream->pad,
+ "Waiting for fragment download to finish: %s", uri);
+ g_cond_wait (&stream->fragment_download_cond,
+ &stream->fragment_download_lock);
+ }
+ } else {
+ stream->last_ret = GST_FLOW_CUSTOM_ERROR;
+ }
+
+ gst_element_set_state (stream->src, GST_STATE_READY);
+}
+
+static void
gst_dash_demux_stream_download_fragment (GstDashDemux * demux,
- GstDashDemuxStream * stream, guint64 * size_buffer,
- GstClockTime * download_time)
+ GstDashDemuxStream * stream)
{
GstActiveStream *active_stream;
- GstFragment *download;
- GTimeVal now;
- GTimeVal start;
guint stream_idx = stream->index;
- GstBuffer *buffer = NULL;
- GstBuffer *header_buffer;
- GstMediaFragmentInfo fragment;
+ GstMediaFragmentInfo *fragment = &stream->current_fragment;
if (G_UNLIKELY (stream->restart_download)) {
GstClockTime cur, ts;
@@ -1914,84 +2139,72 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux,
stream->restart_download = FALSE;
}
- if (gst_mpd_client_get_next_fragment (demux->client, stream_idx, &fragment)) {
- g_get_current_time (&start);
+ g_mutex_lock (&stream->fragment_download_lock);
+ if (gst_mpd_client_get_next_fragment (demux->client, stream_idx, fragment)) {
GST_INFO_OBJECT (stream->pad,
"Fetching next fragment %s ts:%" GST_TIME_FORMAT " dur:%"
GST_TIME_FORMAT " Range:%" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
- fragment.uri, GST_TIME_ARGS (fragment.timestamp),
- GST_TIME_ARGS (fragment.duration),
- fragment.range_start, fragment.range_end);
-
- download = gst_uri_downloader_fetch_uri_with_range (stream->downloader,
- fragment.uri, demux->client->mpd_uri, FALSE, FALSE, TRUE,
- fragment.range_start, fragment.range_end, NULL);
-
- if (download == NULL) {
- gst_media_fragment_info_clear (&fragment);
- return NULL;
- }
+ fragment->uri, GST_TIME_ARGS (fragment->timestamp),
+ GST_TIME_ARGS (fragment->duration),
+ fragment->range_start, fragment->range_end);
- active_stream = stream->active_stream;
- if (active_stream == NULL) {
- gst_media_fragment_info_clear (&fragment);
- g_object_unref (download);
- return NULL;
+ if (stream->need_header) {
+ /* We need to fetch a new header */
+ gst_dash_demux_get_next_header (demux, stream);
+ stream->need_header = FALSE;
}
- buffer = gst_fragment_get_buffer (download);
- g_object_unref (download);
-
/* it is possible to have an index per fragment, so check and download */
- if (fragment.index_uri || fragment.index_range_start
- || fragment.index_range_end != -1) {
- const gchar *uri = fragment.index_uri;
- GstBuffer *index_buffer;
+ if (fragment->index_uri || fragment->index_range_start
+ || fragment->index_range_end != -1) {
+ const gchar *uri = fragment->index_uri;
if (!uri) /* fallback to default media uri */
- uri = fragment.uri;
+ uri = fragment->uri;
GST_DEBUG_OBJECT (stream->pad,
"Fragment index download: %s %" G_GINT64_FORMAT "-%"
- G_GINT64_FORMAT, uri, fragment.index_range_start,
- fragment.index_range_end);
- download =
- gst_uri_downloader_fetch_uri_with_range (stream->downloader, uri,
- demux->client->mpd_uri, FALSE, FALSE, TRUE,
- fragment.index_range_start, fragment.index_range_end, NULL);
- if (download) {
- index_buffer = gst_fragment_get_buffer (download);
- if (index_buffer)
- buffer = gst_buffer_append (index_buffer, buffer);
- g_object_unref (download);
- }
+ G_GINT64_FORMAT, uri, fragment->index_range_start,
+ fragment->index_range_end);
+ gst_dash_demux_stream_download_uri (demux, stream, uri,
+ fragment->index_range_start, fragment->index_range_end);
}
- if (stream->need_header) {
- /* We need to fetch a new header */
- if ((header_buffer =
- gst_dash_demux_get_next_header (demux, stream)) != NULL) {
- buffer = gst_buffer_append (header_buffer, buffer);
- }
- stream->need_header = FALSE;
+ if (demux->cancelled)
+ goto exit;
+
+ /* now get the real fragment */
+ gst_dash_demux_stream_download_uri (demux, stream, fragment->uri,
+ fragment->range_start, fragment->range_end);
+
+ if (stream->last_ret < GST_FLOW_EOS) {
+ gst_media_fragment_info_clear (fragment);
+ goto exit;
}
- g_get_current_time (&now);
- *download_time = (GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (start));
- buffer = gst_buffer_make_writable (buffer);
+ active_stream = stream->active_stream;
+ if (active_stream == NULL) {
+ gst_media_fragment_info_clear (fragment);
+ stream->last_ret = GST_FLOW_ERROR;
+ goto exit;
+ }
- GST_BUFFER_TIMESTAMP (buffer) = fragment.timestamp;
- GST_BUFFER_DURATION (buffer) = fragment.duration;
- GST_BUFFER_OFFSET (buffer) =
- gst_mpd_client_get_segment_index (active_stream) - 1;
+ if (stream->last_ret == GST_FLOW_OK) {
+ stream->position += fragment->duration;
+ }
- gst_media_fragment_info_clear (&fragment);
- *size_buffer += gst_buffer_get_size (buffer);
+ gst_media_fragment_info_clear (fragment);
} else {
GST_WARNING_OBJECT (demux, "Failed to download fragment for stream %p %d",
stream, stream->index);
}
- return buffer;
+
+
+exit:
+ g_mutex_unlock (&stream->fragment_download_lock);
+
+ /* TODO a race can set it to READY after it was set to NULL */
+ gst_element_set_state (stream->src, GST_STATE_READY);
}
/* gst_dash_demux_stream_get_next_fragment:
@@ -2008,9 +2221,6 @@ static GstFlowReturn
gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream * stream,
GstClockTime * ts)
{
- guint64 buffer_size = 0;
- GstClockTime diff;
- GstBuffer *buffer = NULL;
GstFlowReturn ret = GST_FLOW_OK;
GstDashDemux *demux = stream->demux;
@@ -2048,19 +2258,15 @@ gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream * stream,
}
/* Get the fragment corresponding to each stream index */
- buffer =
- gst_dash_demux_stream_download_fragment (demux, stream,
- &buffer_size, &diff);
+ gst_dash_demux_stream_download_fragment (demux, stream);
demux->end_of_period = FALSE;
- if (buffer) {
- ret = gst_dash_demux_push (stream, buffer);
- } else {
+ if (stream->last_ret < GST_FLOW_EOS) {
GST_WARNING_OBJECT (stream->pad, "Failed to download fragment");
return GST_FLOW_ERROR;
}
-
+#if 0
if (buffer_size > 0 && diff > 0) {
#ifndef GST_DISABLE_GST_DEBUG
guint64 brate;
@@ -2076,6 +2282,7 @@ gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream * stream,
G_GUINT64_FORMAT " Ko in %.2f s)", stream->index, brate / 1000,
buffer_size / 1024, ((double) diff / GST_SECOND));
}
+#endif
return ret;
}
diff --git a/ext/dash/gstdashdemux.h b/ext/dash/gstdashdemux.h
index 935e7e9b1..6d6eeedf1 100644
--- a/ext/dash/gstdashdemux.h
+++ b/ext/dash/gstdashdemux.h
@@ -81,6 +81,17 @@ struct _GstDashDemuxStream
GstUriDownloader *downloader;
GstDownloadRate dnl_rate;
+
+ /* download tooling */
+ GstElement *src;
+ GstPad *src_srcpad;
+ GMutex fragment_download_lock;
+ GCond fragment_download_cond;
+ GstMediaFragmentInfo current_fragment;
+ gboolean starting_fragment;
+ gint64 download_start_time;
+ gint64 download_total_time;
+ gint64 download_total_bytes;
};
/**
@@ -90,7 +101,7 @@ struct _GstDashDemuxStream
*/
struct _GstDashDemux
{
- GstElement parent;
+ GstBin parent;
GstPad *sinkpad;
gboolean have_group_id;
@@ -123,7 +134,7 @@ struct _GstDashDemux
struct _GstDashDemuxClass
{
- GstElementClass parent_class;
+ GstBinClass parent_class;
};
GType gst_dash_demux_get_type (void);