diff options
author | Thiago Santos <ts.santos@sisa.samsung.com> | 2014-04-07 13:57:26 -0300 |
---|---|---|
committer | Thiago Santos <ts.santos@sisa.samsung.com> | 2014-04-29 18:17:07 -0300 |
commit | 36117595579cfde296c905f170e2b14ef8a6c534 (patch) | |
tree | 12855424735bb530f2b7bad02c8861ea0008f233 /ext | |
parent | eee4f95a1fb7820d58a8f777f9b4a77aa441733e (diff) | |
download | gstreamer-plugins-bad-36117595579cfde296c905f170e2b14ef8a6c534.tar.gz |
hlsdemux: replace uridownloader with a GstElement
The GstElement is directly linked into a ghost pad and
its buffers are pushed as received downstream. This way the
buffers are small enough and not a whole fragment that usually
causes extra latency and makes buffering harder
Diffstat (limited to 'ext')
-rw-r--r-- | ext/hls/gsthlsdemux.c | 346 | ||||
-rw-r--r-- | ext/hls/gsthlsdemux.h | 11 |
2 files changed, 204 insertions, 153 deletions
diff --git a/ext/hls/gsthlsdemux.c b/ext/hls/gsthlsdemux.c index 566809f30..11b67298d 100644 --- a/ext/hls/gsthlsdemux.c +++ b/ext/hls/gsthlsdemux.c @@ -47,6 +47,7 @@ #else #include <gcrypt.h> #endif +#include <gst/base/gsttypefindhelper.h> #include "gsthlsdemux.h" static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u", @@ -101,9 +102,11 @@ static void gst_hls_demux_stream_loop (GstHLSDemux * demux); static void gst_hls_demux_updates_loop (GstHLSDemux * demux); static void gst_hls_demux_stop (GstHLSDemux * demux); static void gst_hls_demux_pause_tasks (GstHLSDemux * demux); +#if 0 static gboolean gst_hls_demux_switch_playlist (GstHLSDemux * demux, GstFragment * fragment); -static GstFragment *gst_hls_demux_get_next_fragment (GstHLSDemux * demux, +#endif +static gboolean gst_hls_demux_get_next_fragment (GstHLSDemux * demux, gboolean * end_of_playlist, GError ** err); static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux, gboolean update, GError ** err); @@ -116,7 +119,7 @@ static gboolean gst_hls_demux_change_playlist (GstHLSDemux * demux, guint max_bitrate); #define gst_hls_demux_parent_class parent_class -G_DEFINE_TYPE (GstHLSDemux, gst_hls_demux, GST_TYPE_ELEMENT); +G_DEFINE_TYPE (GstHLSDemux, gst_hls_demux, GST_TYPE_BIN); static void gst_hls_demux_dispose (GObject * obj) @@ -146,6 +149,8 @@ gst_hls_demux_dispose (GObject * obj) g_cond_clear (&demux->download_cond); g_mutex_clear (&demux->updates_timed_lock); g_cond_clear (&demux->updates_timed_cond); + g_mutex_clear (&demux->fragment_download_lock); + g_cond_clear (&demux->fragment_download_cond); G_OBJECT_CLASS (parent_class)->dispose (obj); } @@ -226,6 +231,8 @@ gst_hls_demux_init (GstHLSDemux * demux) g_cond_init (&demux->download_cond); g_mutex_init (&demux->updates_timed_lock); g_cond_init (&demux->updates_timed_cond); + g_mutex_init (&demux->fragment_download_lock); + g_cond_init (&demux->fragment_download_cond); /* Updates task */ g_rec_mutex_init (&demux->updates_lock); @@ -238,6 +245,9 @@ gst_hls_demux_init (GstHLSDemux * demux) demux->stream_task = gst_task_new ((GstTaskFunction) gst_hls_demux_stream_loop, demux, NULL); gst_task_set_lock (demux->stream_task, &demux->stream_lock); + demux->src = gst_element_factory_make ("souphttpsrc", "hls-download-src"); + gst_element_set_locked_state (demux->src, TRUE); + gst_bin_add (GST_BIN_CAST (demux), demux->src); demux->have_group_id = FALSE; demux->group_id = G_MAXUINT; @@ -305,6 +315,7 @@ gst_hls_demux_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_element_set_state (demux->src, GST_STATE_NULL); gst_hls_demux_stop (demux); gst_task_join (demux->updates_task); gst_task_join (demux->stream_task); @@ -687,12 +698,129 @@ gst_hls_demux_stop (GstHLSDemux * demux) demux->stop_stream_task = TRUE; g_cond_signal (&demux->download_cond); g_mutex_unlock (&demux->download_lock); + g_mutex_lock (&demux->fragment_download_lock); + g_cond_signal (&demux->fragment_download_cond); + g_mutex_unlock (&demux->fragment_download_lock); gst_task_stop (demux->stream_task); g_rec_mutex_lock (&demux->stream_lock); g_rec_mutex_unlock (&demux->stream_lock); } } +static GstFlowReturn +_src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +{ + GstPad *srcpad = (GstPad *) parent; + GstHLSDemux *demux = (GstHLSDemux *) GST_PAD_PARENT (srcpad); + GstFlowReturn ret; + GstCaps *caps; + + /* We actually need to do this every time we switch bitrate */ + if (G_UNLIKELY (demux->do_typefind)) { + caps = gst_type_find_helper_for_buffer (NULL, buffer, NULL); + if (G_UNLIKELY (!caps)) { + GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, + ("Could not determine type of stream"), (NULL)); + gst_buffer_unref (buffer); + return GST_FLOW_NOT_NEGOTIATED; + } + + if (!demux->input_caps || !gst_caps_is_equal (caps, demux->input_caps)) { + gst_caps_replace (&demux->input_caps, caps); + GST_INFO_OBJECT (demux, "Input source caps: %" GST_PTR_FORMAT, + demux->input_caps); + demux->do_typefind = FALSE; + } + gst_pad_set_caps (srcpad, caps); + gst_caps_unref (caps); + } + + if (demux->discont) { + GST_DEBUG_OBJECT (demux, "Marking fragment as discontinuous"); + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + demux->discont = FALSE; + } else if (demux->starting_fragment && demux->segment.rate < 0) { + /* Set DISCONT flag for every buffer in reverse playback mode + * as each fragment for its own has to be reversed */ + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + } else { + GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT); + } + + demux->starting_fragment = FALSE; + + GST_DEBUG_OBJECT (demux, "set fragment pts=%" GST_TIME_FORMAT, + GST_TIME_ARGS (demux->current_timestamp)); + + GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE; + GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE; + GST_BUFFER_PTS (buffer) = demux->current_timestamp; + + demux->segment.position = GST_BUFFER_TIMESTAMP (buffer); +#if 0 + if (demux->segment.rate > 0) + demux->segment.position += GST_BUFFER_DURATION (buf); +#endif + + if (demux->need_segment) { + /* And send a newsegment */ + GST_DEBUG_OBJECT (demux, "Sending segment event: %" + GST_SEGMENT_FORMAT, &demux->segment); + gst_pad_push_event (demux->srcpad, gst_event_new_segment (&demux->segment)); + demux->need_segment = FALSE; + } + + ret = gst_proxy_pad_chain_default (pad, parent, buffer); + + if (ret != GST_FLOW_OK) { + if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) { + GST_ELEMENT_ERROR (demux, STREAM, FAILED, (NULL), + ("stream stopped, reason %s", gst_flow_get_name (ret))); + gst_pad_push_event (demux->srcpad, gst_event_new_eos ()); + } else { + GST_DEBUG_OBJECT (demux, "stream stopped, reason %s", + gst_flow_get_name (ret)); + } + gst_hls_demux_pause_tasks (demux); + } + + /* avoid having the source handle the same error again */ + ret = GST_FLOW_OK; + + return ret; +} + +static gboolean +_src_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstPad *srcpad = GST_PAD_CAST (parent); + GstHLSDemux *demux = (GstHLSDemux *) GST_PAD_PARENT (srcpad);; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + g_cond_signal (&demux->fragment_download_cond); + break; + default: + break; + } + + return TRUE; +} + +static gboolean +_src_query (GstPad * pad, GstObject * parent, GstQuery * query) +{ + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_ALLOCATION: + return FALSE; + break; + default: + break; + } + + return gst_pad_query_default (pad, parent, query); +} + static void switch_pads (GstHLSDemux * demux, GstCaps * newcaps) { @@ -700,14 +828,35 @@ switch_pads (GstHLSDemux * demux, GstCaps * newcaps) GstEvent *event; gchar *stream_id; gchar *name; + GstPad *target; + GstPadTemplate *tmpl; + GstProxyPad *internal_pad; - GST_DEBUG ("Switching pads (oldpad:%p) with caps: %" GST_PTR_FORMAT, oldpad, + GST_DEBUG_OBJECT (demux, + "Switching pads (oldpad:%p) with caps: %" GST_PTR_FORMAT, oldpad, newcaps); + target = gst_element_get_static_pad (demux->src, "src"); + /* First create and activate new pad */ name = g_strdup_printf ("src_%u", demux->srcpad_counter++); - demux->srcpad = gst_pad_new_from_static_template (&srctemplate, name); + tmpl = gst_static_pad_template_get (&srctemplate); + demux->srcpad = gst_ghost_pad_new_from_template (name, target, tmpl); + gst_object_unref (tmpl); g_free (name); + gst_object_unref (target); + + /* set up our internal pad to drop all events from + * the http src we don't care about. On the chain function + * we just push the buffer forward, but this way hls can get + * the flow return from downstream */ + internal_pad = gst_proxy_pad_get_internal (GST_PROXY_PAD (demux->srcpad)); + gst_pad_set_chain_function (GST_PAD_CAST (internal_pad), _src_chain); + gst_pad_set_event_function (GST_PAD_CAST (internal_pad), _src_event); + /* need to set query otherwise deadlocks happen with allocation queries */ + gst_pad_set_query_function (GST_PAD_CAST (internal_pad), _src_query); + gst_object_unref (internal_pad); + gst_pad_set_event_function (demux->srcpad, GST_DEBUG_FUNCPTR (gst_hls_demux_src_event)); gst_pad_set_query_function (demux->srcpad, @@ -752,54 +901,28 @@ switch_pads (GstHLSDemux * demux, GstCaps * newcaps) } static gboolean -gst_hls_demux_configure_src_pad (GstHLSDemux * demux, GstFragment * fragment) +gst_hls_demux_configure_src_pad (GstHLSDemux * demux, GstCaps * bufcaps) { - GstCaps *bufcaps = NULL, *srccaps = NULL; - GstBuffer *buf = NULL; + GstCaps *srccaps = NULL; /* Figure out if we need to create/switch pads */ if (G_LIKELY (demux->srcpad)) srccaps = gst_pad_get_current_caps (demux->srcpad); - if (fragment) { - bufcaps = gst_fragment_get_caps (fragment); - if (G_UNLIKELY (!bufcaps)) { - if (srccaps) - gst_caps_unref (srccaps); - return FALSE; - } - buf = gst_fragment_get_buffer (fragment); - } - if (G_UNLIKELY (!srccaps || demux->discont || (buf - && GST_BUFFER_IS_DISCONT (buf)))) { + if (G_UNLIKELY (!srccaps || demux->discont)) { switch_pads (demux, bufcaps); demux->need_segment = TRUE; - demux->discont = FALSE; - if (buf) - GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT); } if (bufcaps) gst_caps_unref (bufcaps); if (G_LIKELY (srccaps)) gst_caps_unref (srccaps); - if (demux->need_segment) { - /* And send a newsegment */ - GST_DEBUG_OBJECT (demux, "Sending segment event: %" - GST_SEGMENT_FORMAT, &demux->segment); - gst_pad_push_event (demux->srcpad, gst_event_new_segment (&demux->segment)); - demux->need_segment = FALSE; - } - if (buf) - gst_buffer_unref (buf); return TRUE; } static void gst_hls_demux_stream_loop (GstHLSDemux * demux) { - GstFragment *fragment; - GstBuffer *buf; - GstFlowReturn ret; gboolean end_of_playlist; GError *err = NULL; @@ -827,9 +950,7 @@ gst_hls_demux_stream_loop (GstHLSDemux * demux) } demux->next_download = g_get_monotonic_time (); - if ((fragment = - gst_hls_demux_get_next_fragment (demux, &end_of_playlist, - &err)) == NULL) { + if (!gst_hls_demux_get_next_fragment (demux, &end_of_playlist, &err)) { if (demux->stop_stream_task) { g_clear_error (&err); goto pause_task; @@ -914,45 +1035,20 @@ gst_hls_demux_stream_loop (GstHLSDemux * demux) gst_m3u8_client_advance_fragment (demux->client, demux->segment.rate > 0); if (demux->stop_updates_task) { - g_object_unref (fragment); goto pause_task; } } if (demux->stop_updates_task) { - g_object_unref (fragment); goto pause_task; } - - if (!gst_hls_demux_configure_src_pad (demux, fragment)) { - g_object_unref (fragment); - goto type_not_found; - } - - buf = gst_fragment_get_buffer (fragment); - - GST_DEBUG_OBJECT (demux, "Pushing buffer %" GST_TIME_FORMAT, - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf))); - - /* Set DISCONT flag for every buffer in reverse playback mode - * as each fragment for its own has to be reversed */ - if (demux->segment.rate < 0) { - GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT); - } - - demux->segment.position = GST_BUFFER_TIMESTAMP (buf); - if (demux->segment.rate > 0) - demux->segment.position += GST_BUFFER_DURATION (buf); - - ret = gst_pad_push (demux->srcpad, buf); - if (ret != GST_FLOW_OK) - goto error_pushing; +#if 0 /* try to switch to another bitrate if needed */ gst_hls_demux_switch_playlist (demux, fragment); - g_object_unref (fragment); +#endif - GST_DEBUG_OBJECT (demux, "Pushed buffer"); + GST_DEBUG_OBJECT (demux, "Finished pushing fragment"); return; @@ -967,29 +1063,6 @@ end_of_playlist: return; } -type_not_found: - { - GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, - ("Could not determine type of stream"), (NULL)); - gst_hls_demux_pause_tasks (demux); - return; - } - -error_pushing: - { - g_object_unref (fragment); - if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) { - GST_ELEMENT_ERROR (demux, STREAM, FAILED, (NULL), - ("stream stopped, reason %s", gst_flow_get_name (ret))); - gst_pad_push_event (demux->srcpad, gst_event_new_eos ()); - } else { - GST_DEBUG_OBJECT (demux, "stream stopped, reason %s", - gst_flow_get_name (ret)); - } - gst_hls_demux_pause_tasks (demux); - return; - } - pause_task: { GST_DEBUG_OBJECT (demux, "Pause task"); @@ -1252,8 +1325,8 @@ gst_hls_demux_update_playlist (GstHLSDemux * demux, gboolean update, GST_M3U8_CLIENT_LOCK (demux->client); last_sequence = - GST_M3U8_MEDIA_FILE (g_list_last (demux->client->current->files)-> - data)->sequence; + GST_M3U8_MEDIA_FILE (g_list_last (demux->client->current-> + files)->data)->sequence; if (demux->client->sequence >= last_sequence - 3) { GST_DEBUG_OBJECT (demux, "Sequence is beyond playlist. Moving back to %u", @@ -1353,8 +1426,8 @@ retry_failover_protection: gst_m3u8_client_set_current (demux->client, previous_variant->data); /* Try a lower bitrate (or stop if we just tried the lowest) */ if (GST_M3U8 (previous_variant->data)->iframe && new_bandwidth == - GST_M3U8 (g_list_first (demux->client->main->iframe_lists)-> - data)->bandwidth) + GST_M3U8 (g_list_first (demux->client->main->iframe_lists)->data)-> + bandwidth) return FALSE; else if (!GST_M3U8 (previous_variant->data)->iframe && new_bandwidth == GST_M3U8 (g_list_first (demux->client->main->lists)->data)->bandwidth) @@ -1369,6 +1442,7 @@ retry_failover_protection: return TRUE; } +#if 0 static gboolean gst_hls_demux_switch_playlist (GstHLSDemux * demux, GstFragment * fragment) { @@ -1411,7 +1485,9 @@ gst_hls_demux_switch_playlist (GstHLSDemux * demux, GstFragment * fragment) return gst_hls_demux_change_playlist (demux, bitrate * demux->bitrate_limit); } +#endif +#if 0 #ifdef HAVE_NETTLE static gboolean decrypt_fragment (GstHLSDemux * demux, gsize length, @@ -1549,16 +1625,15 @@ decrypt_error: g_object_unref (encrypted_fragment); return ret; } +#endif -static GstFragment * +static gboolean gst_hls_demux_get_next_fragment (GstHLSDemux * demux, gboolean * end_of_playlist, GError ** err) { - GstFragment *download; const gchar *next_fragment_uri; GstClockTime duration; GstClockTime timestamp; - GstBuffer *buf; gboolean discont; gint64 range_start, range_end; const gchar *key = NULL; @@ -1570,74 +1645,43 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux, &key, &iv, demux->segment.rate > 0)) { GST_INFO_OBJECT (demux, "This playlist doesn't contain more fragments"); *end_of_playlist = TRUE; - return NULL; + return FALSE; } - GST_INFO_OBJECT (demux, + if (!gst_hls_demux_configure_src_pad (demux, NULL)) { + *end_of_playlist = FALSE; + return FALSE; + } + + g_mutex_lock (&demux->fragment_download_lock); + GST_DEBUG_OBJECT (demux, "Fetching next fragment %s (range=%" G_GINT64_FORMAT "-%" G_GINT64_FORMAT ")", next_fragment_uri, range_start, range_end); - download = - gst_uri_downloader_fetch_uri_with_range_and_referer (demux->downloader, - next_fragment_uri, demux->client->main ? demux->client->main->uri : NULL, - FALSE, range_start, range_end, err); + /* set up our source for download */ + demux->current_timestamp = timestamp; + demux->starting_fragment = TRUE; + g_object_set (demux->src, "location", next_fragment_uri, NULL); + gst_element_set_state (demux->src, GST_STATE_READY); /* TODO check return */ + gst_element_send_event (demux->src, gst_event_new_seek (1.0, GST_FORMAT_BYTES, + (GstSeekFlags) GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, range_start, + GST_SEEK_TYPE_SET, range_end)); - if (download == NULL) - goto error; + gst_element_sync_state_with_parent (demux->src); + /* wait for the fragment to be completely downloaded */ + g_cond_wait (&demux->fragment_download_cond, &demux->fragment_download_lock); + + g_mutex_unlock (&demux->fragment_download_lock); + gst_element_set_state (demux->src, GST_STATE_NULL); + +#if 0 if (key) { download = gst_hls_demux_decrypt_fragment (demux, download, key, iv, err); if (download == NULL) goto error; } +#endif - buf = gst_fragment_get_buffer (download); - - GST_DEBUG_OBJECT (demux, "set fragment pts=%" GST_TIME_FORMAT " duration=%" - GST_TIME_FORMAT, GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration)); - - GST_BUFFER_DURATION (buf) = duration; - GST_BUFFER_PTS (buf) = timestamp; - GST_BUFFER_DTS (buf) = GST_CLOCK_TIME_NONE; - - /* We actually need to do this every time we switch bitrate */ - if (G_UNLIKELY (demux->do_typefind)) { - GstCaps *caps = gst_fragment_get_caps (download); - - if (G_UNLIKELY (!caps)) { - GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, - ("Could not determine type of stream"), (NULL)); - gst_buffer_unref (buf); - g_object_unref (download); - goto error; - } - - if (!demux->input_caps || !gst_caps_is_equal (caps, demux->input_caps)) { - gst_caps_replace (&demux->input_caps, caps); - /* gst_pad_set_caps (demux->srcpad, demux->input_caps); */ - GST_INFO_OBJECT (demux, "Input source caps: %" GST_PTR_FORMAT, - demux->input_caps); - demux->do_typefind = FALSE; - } - gst_caps_unref (caps); - } else { - gst_fragment_set_caps (download, demux->input_caps); - } - - if (discont) { - GST_DEBUG_OBJECT (demux, "Marking fragment as discontinuous"); - GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT); - } else { - GST_BUFFER_FLAG_UNSET (buf, GST_BUFFER_FLAG_DISCONT); - } - - /* The buffer ref is still kept inside the fragment download */ - gst_buffer_unref (buf); - - return download; - -error: - { - return NULL; - } + return TRUE; } diff --git a/ext/hls/gsthlsdemux.h b/ext/hls/gsthlsdemux.h index 57aa0a885..b7f2cdf37 100644 --- a/ext/hls/gsthlsdemux.h +++ b/ext/hls/gsthlsdemux.h @@ -53,7 +53,7 @@ typedef struct _GstHLSDemuxClass GstHLSDemuxClass; */ struct _GstHLSDemux { - GstElement parent; + GstBin parent; GstPad *sinkpad; GstPad *srcpad; @@ -103,11 +103,18 @@ struct _GstHLSDemux /* Current download rate (bps) */ gint current_download_rate; + + /* fragment download tooling */ + GstElement *src; + GMutex fragment_download_lock; + GCond fragment_download_cond; + GstClockTime current_timestamp; + gboolean starting_fragment; }; struct _GstHLSDemuxClass { - GstElementClass parent_class; + GstBinClass parent_class; }; GType gst_hls_demux_get_type (void); |