summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
authorThiago Santos <ts.santos@sisa.samsung.com>2014-04-07 13:57:26 -0300
committerThiago Santos <ts.santos@sisa.samsung.com>2014-04-29 18:17:07 -0300
commit36117595579cfde296c905f170e2b14ef8a6c534 (patch)
tree12855424735bb530f2b7bad02c8861ea0008f233 /ext
parenteee4f95a1fb7820d58a8f777f9b4a77aa441733e (diff)
downloadgstreamer-plugins-bad-36117595579cfde296c905f170e2b14ef8a6c534.tar.gz
hlsdemux: replace uridownloader with a GstElement
The GstElement is directly linked into a ghost pad and its buffers are pushed as received downstream. This way the buffers are small enough and not a whole fragment that usually causes extra latency and makes buffering harder
Diffstat (limited to 'ext')
-rw-r--r--ext/hls/gsthlsdemux.c346
-rw-r--r--ext/hls/gsthlsdemux.h11
2 files changed, 204 insertions, 153 deletions
diff --git a/ext/hls/gsthlsdemux.c b/ext/hls/gsthlsdemux.c
index 566809f30..11b67298d 100644
--- a/ext/hls/gsthlsdemux.c
+++ b/ext/hls/gsthlsdemux.c
@@ -47,6 +47,7 @@
#else
#include <gcrypt.h>
#endif
+#include <gst/base/gsttypefindhelper.h>
#include "gsthlsdemux.h"
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
@@ -101,9 +102,11 @@ static void gst_hls_demux_stream_loop (GstHLSDemux * demux);
static void gst_hls_demux_updates_loop (GstHLSDemux * demux);
static void gst_hls_demux_stop (GstHLSDemux * demux);
static void gst_hls_demux_pause_tasks (GstHLSDemux * demux);
+#if 0
static gboolean gst_hls_demux_switch_playlist (GstHLSDemux * demux,
GstFragment * fragment);
-static GstFragment *gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
+#endif
+static gboolean gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
gboolean * end_of_playlist, GError ** err);
static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux,
gboolean update, GError ** err);
@@ -116,7 +119,7 @@ static gboolean gst_hls_demux_change_playlist (GstHLSDemux * demux,
guint max_bitrate);
#define gst_hls_demux_parent_class parent_class
-G_DEFINE_TYPE (GstHLSDemux, gst_hls_demux, GST_TYPE_ELEMENT);
+G_DEFINE_TYPE (GstHLSDemux, gst_hls_demux, GST_TYPE_BIN);
static void
gst_hls_demux_dispose (GObject * obj)
@@ -146,6 +149,8 @@ gst_hls_demux_dispose (GObject * obj)
g_cond_clear (&demux->download_cond);
g_mutex_clear (&demux->updates_timed_lock);
g_cond_clear (&demux->updates_timed_cond);
+ g_mutex_clear (&demux->fragment_download_lock);
+ g_cond_clear (&demux->fragment_download_cond);
G_OBJECT_CLASS (parent_class)->dispose (obj);
}
@@ -226,6 +231,8 @@ gst_hls_demux_init (GstHLSDemux * demux)
g_cond_init (&demux->download_cond);
g_mutex_init (&demux->updates_timed_lock);
g_cond_init (&demux->updates_timed_cond);
+ g_mutex_init (&demux->fragment_download_lock);
+ g_cond_init (&demux->fragment_download_cond);
/* Updates task */
g_rec_mutex_init (&demux->updates_lock);
@@ -238,6 +245,9 @@ gst_hls_demux_init (GstHLSDemux * demux)
demux->stream_task =
gst_task_new ((GstTaskFunction) gst_hls_demux_stream_loop, demux, NULL);
gst_task_set_lock (demux->stream_task, &demux->stream_lock);
+ demux->src = gst_element_factory_make ("souphttpsrc", "hls-download-src");
+ gst_element_set_locked_state (demux->src, TRUE);
+ gst_bin_add (GST_BIN_CAST (demux), demux->src);
demux->have_group_id = FALSE;
demux->group_id = G_MAXUINT;
@@ -305,6 +315,7 @@ gst_hls_demux_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
+ gst_element_set_state (demux->src, GST_STATE_NULL);
gst_hls_demux_stop (demux);
gst_task_join (demux->updates_task);
gst_task_join (demux->stream_task);
@@ -687,12 +698,129 @@ gst_hls_demux_stop (GstHLSDemux * demux)
demux->stop_stream_task = TRUE;
g_cond_signal (&demux->download_cond);
g_mutex_unlock (&demux->download_lock);
+ g_mutex_lock (&demux->fragment_download_lock);
+ g_cond_signal (&demux->fragment_download_cond);
+ g_mutex_unlock (&demux->fragment_download_lock);
gst_task_stop (demux->stream_task);
g_rec_mutex_lock (&demux->stream_lock);
g_rec_mutex_unlock (&demux->stream_lock);
}
}
+static GstFlowReturn
+_src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+ GstPad *srcpad = (GstPad *) parent;
+ GstHLSDemux *demux = (GstHLSDemux *) GST_PAD_PARENT (srcpad);
+ GstFlowReturn ret;
+ GstCaps *caps;
+
+ /* We actually need to do this every time we switch bitrate */
+ if (G_UNLIKELY (demux->do_typefind)) {
+ caps = gst_type_find_helper_for_buffer (NULL, buffer, NULL);
+ if (G_UNLIKELY (!caps)) {
+ GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND,
+ ("Could not determine type of stream"), (NULL));
+ gst_buffer_unref (buffer);
+ return GST_FLOW_NOT_NEGOTIATED;
+ }
+
+ if (!demux->input_caps || !gst_caps_is_equal (caps, demux->input_caps)) {
+ gst_caps_replace (&demux->input_caps, caps);
+ GST_INFO_OBJECT (demux, "Input source caps: %" GST_PTR_FORMAT,
+ demux->input_caps);
+ demux->do_typefind = FALSE;
+ }
+ gst_pad_set_caps (srcpad, caps);
+ gst_caps_unref (caps);
+ }
+
+ if (demux->discont) {
+ GST_DEBUG_OBJECT (demux, "Marking fragment as discontinuous");
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+ demux->discont = FALSE;
+ } else if (demux->starting_fragment && demux->segment.rate < 0) {
+ /* Set DISCONT flag for every buffer in reverse playback mode
+ * as each fragment for its own has to be reversed */
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+ } else {
+ GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
+ }
+
+ demux->starting_fragment = FALSE;
+
+ GST_DEBUG_OBJECT (demux, "set fragment pts=%" GST_TIME_FORMAT,
+ GST_TIME_ARGS (demux->current_timestamp));
+
+ GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
+ GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
+ GST_BUFFER_PTS (buffer) = demux->current_timestamp;
+
+ demux->segment.position = GST_BUFFER_TIMESTAMP (buffer);
+#if 0
+ if (demux->segment.rate > 0)
+ demux->segment.position += GST_BUFFER_DURATION (buf);
+#endif
+
+ if (demux->need_segment) {
+ /* And send a newsegment */
+ GST_DEBUG_OBJECT (demux, "Sending segment event: %"
+ GST_SEGMENT_FORMAT, &demux->segment);
+ gst_pad_push_event (demux->srcpad, gst_event_new_segment (&demux->segment));
+ demux->need_segment = FALSE;
+ }
+
+ ret = gst_proxy_pad_chain_default (pad, parent, buffer);
+
+ if (ret != GST_FLOW_OK) {
+ if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) {
+ GST_ELEMENT_ERROR (demux, STREAM, FAILED, (NULL),
+ ("stream stopped, reason %s", gst_flow_get_name (ret)));
+ gst_pad_push_event (demux->srcpad, gst_event_new_eos ());
+ } else {
+ GST_DEBUG_OBJECT (demux, "stream stopped, reason %s",
+ gst_flow_get_name (ret));
+ }
+ gst_hls_demux_pause_tasks (demux);
+ }
+
+ /* avoid having the source handle the same error again */
+ ret = GST_FLOW_OK;
+
+ return ret;
+}
+
+static gboolean
+_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ GstPad *srcpad = GST_PAD_CAST (parent);
+ GstHLSDemux *demux = (GstHLSDemux *) GST_PAD_PARENT (srcpad);;
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_EOS:
+ g_cond_signal (&demux->fragment_download_cond);
+ break;
+ default:
+ break;
+ }
+
+ return TRUE;
+}
+
+static gboolean
+_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
+{
+ switch (GST_QUERY_TYPE (query)) {
+ case GST_QUERY_ALLOCATION:
+ return FALSE;
+ break;
+ default:
+ break;
+ }
+
+ return gst_pad_query_default (pad, parent, query);
+}
+
static void
switch_pads (GstHLSDemux * demux, GstCaps * newcaps)
{
@@ -700,14 +828,35 @@ switch_pads (GstHLSDemux * demux, GstCaps * newcaps)
GstEvent *event;
gchar *stream_id;
gchar *name;
+ GstPad *target;
+ GstPadTemplate *tmpl;
+ GstProxyPad *internal_pad;
- GST_DEBUG ("Switching pads (oldpad:%p) with caps: %" GST_PTR_FORMAT, oldpad,
+ GST_DEBUG_OBJECT (demux,
+ "Switching pads (oldpad:%p) with caps: %" GST_PTR_FORMAT, oldpad,
newcaps);
+ target = gst_element_get_static_pad (demux->src, "src");
+
/* First create and activate new pad */
name = g_strdup_printf ("src_%u", demux->srcpad_counter++);
- demux->srcpad = gst_pad_new_from_static_template (&srctemplate, name);
+ tmpl = gst_static_pad_template_get (&srctemplate);
+ demux->srcpad = gst_ghost_pad_new_from_template (name, target, tmpl);
+ gst_object_unref (tmpl);
g_free (name);
+ gst_object_unref (target);
+
+ /* set up our internal pad to drop all events from
+ * the http src we don't care about. On the chain function
+ * we just push the buffer forward, but this way hls can get
+ * the flow return from downstream */
+ internal_pad = gst_proxy_pad_get_internal (GST_PROXY_PAD (demux->srcpad));
+ gst_pad_set_chain_function (GST_PAD_CAST (internal_pad), _src_chain);
+ gst_pad_set_event_function (GST_PAD_CAST (internal_pad), _src_event);
+ /* need to set query otherwise deadlocks happen with allocation queries */
+ gst_pad_set_query_function (GST_PAD_CAST (internal_pad), _src_query);
+ gst_object_unref (internal_pad);
+
gst_pad_set_event_function (demux->srcpad,
GST_DEBUG_FUNCPTR (gst_hls_demux_src_event));
gst_pad_set_query_function (demux->srcpad,
@@ -752,54 +901,28 @@ switch_pads (GstHLSDemux * demux, GstCaps * newcaps)
}
static gboolean
-gst_hls_demux_configure_src_pad (GstHLSDemux * demux, GstFragment * fragment)
+gst_hls_demux_configure_src_pad (GstHLSDemux * demux, GstCaps * bufcaps)
{
- GstCaps *bufcaps = NULL, *srccaps = NULL;
- GstBuffer *buf = NULL;
+ GstCaps *srccaps = NULL;
/* Figure out if we need to create/switch pads */
if (G_LIKELY (demux->srcpad))
srccaps = gst_pad_get_current_caps (demux->srcpad);
- if (fragment) {
- bufcaps = gst_fragment_get_caps (fragment);
- if (G_UNLIKELY (!bufcaps)) {
- if (srccaps)
- gst_caps_unref (srccaps);
- return FALSE;
- }
- buf = gst_fragment_get_buffer (fragment);
- }
- if (G_UNLIKELY (!srccaps || demux->discont || (buf
- && GST_BUFFER_IS_DISCONT (buf)))) {
+ if (G_UNLIKELY (!srccaps || demux->discont)) {
switch_pads (demux, bufcaps);
demux->need_segment = TRUE;
- demux->discont = FALSE;
- if (buf)
- GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
}
if (bufcaps)
gst_caps_unref (bufcaps);
if (G_LIKELY (srccaps))
gst_caps_unref (srccaps);
- if (demux->need_segment) {
- /* And send a newsegment */
- GST_DEBUG_OBJECT (demux, "Sending segment event: %"
- GST_SEGMENT_FORMAT, &demux->segment);
- gst_pad_push_event (demux->srcpad, gst_event_new_segment (&demux->segment));
- demux->need_segment = FALSE;
- }
- if (buf)
- gst_buffer_unref (buf);
return TRUE;
}
static void
gst_hls_demux_stream_loop (GstHLSDemux * demux)
{
- GstFragment *fragment;
- GstBuffer *buf;
- GstFlowReturn ret;
gboolean end_of_playlist;
GError *err = NULL;
@@ -827,9 +950,7 @@ gst_hls_demux_stream_loop (GstHLSDemux * demux)
}
demux->next_download = g_get_monotonic_time ();
- if ((fragment =
- gst_hls_demux_get_next_fragment (demux, &end_of_playlist,
- &err)) == NULL) {
+ if (!gst_hls_demux_get_next_fragment (demux, &end_of_playlist, &err)) {
if (demux->stop_stream_task) {
g_clear_error (&err);
goto pause_task;
@@ -914,45 +1035,20 @@ gst_hls_demux_stream_loop (GstHLSDemux * demux)
gst_m3u8_client_advance_fragment (demux->client, demux->segment.rate > 0);
if (demux->stop_updates_task) {
- g_object_unref (fragment);
goto pause_task;
}
}
if (demux->stop_updates_task) {
- g_object_unref (fragment);
goto pause_task;
}
-
- if (!gst_hls_demux_configure_src_pad (demux, fragment)) {
- g_object_unref (fragment);
- goto type_not_found;
- }
-
- buf = gst_fragment_get_buffer (fragment);
-
- GST_DEBUG_OBJECT (demux, "Pushing buffer %" GST_TIME_FORMAT,
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
-
- /* Set DISCONT flag for every buffer in reverse playback mode
- * as each fragment for its own has to be reversed */
- if (demux->segment.rate < 0) {
- GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
- }
-
- demux->segment.position = GST_BUFFER_TIMESTAMP (buf);
- if (demux->segment.rate > 0)
- demux->segment.position += GST_BUFFER_DURATION (buf);
-
- ret = gst_pad_push (demux->srcpad, buf);
- if (ret != GST_FLOW_OK)
- goto error_pushing;
+#if 0
/* try to switch to another bitrate if needed */
gst_hls_demux_switch_playlist (demux, fragment);
- g_object_unref (fragment);
+#endif
- GST_DEBUG_OBJECT (demux, "Pushed buffer");
+ GST_DEBUG_OBJECT (demux, "Finished pushing fragment");
return;
@@ -967,29 +1063,6 @@ end_of_playlist:
return;
}
-type_not_found:
- {
- GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND,
- ("Could not determine type of stream"), (NULL));
- gst_hls_demux_pause_tasks (demux);
- return;
- }
-
-error_pushing:
- {
- g_object_unref (fragment);
- if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) {
- GST_ELEMENT_ERROR (demux, STREAM, FAILED, (NULL),
- ("stream stopped, reason %s", gst_flow_get_name (ret)));
- gst_pad_push_event (demux->srcpad, gst_event_new_eos ());
- } else {
- GST_DEBUG_OBJECT (demux, "stream stopped, reason %s",
- gst_flow_get_name (ret));
- }
- gst_hls_demux_pause_tasks (demux);
- return;
- }
-
pause_task:
{
GST_DEBUG_OBJECT (demux, "Pause task");
@@ -1252,8 +1325,8 @@ gst_hls_demux_update_playlist (GstHLSDemux * demux, gboolean update,
GST_M3U8_CLIENT_LOCK (demux->client);
last_sequence =
- GST_M3U8_MEDIA_FILE (g_list_last (demux->client->current->files)->
- data)->sequence;
+ GST_M3U8_MEDIA_FILE (g_list_last (demux->client->current->
+ files)->data)->sequence;
if (demux->client->sequence >= last_sequence - 3) {
GST_DEBUG_OBJECT (demux, "Sequence is beyond playlist. Moving back to %u",
@@ -1353,8 +1426,8 @@ retry_failover_protection:
gst_m3u8_client_set_current (demux->client, previous_variant->data);
/* Try a lower bitrate (or stop if we just tried the lowest) */
if (GST_M3U8 (previous_variant->data)->iframe && new_bandwidth ==
- GST_M3U8 (g_list_first (demux->client->main->iframe_lists)->
- data)->bandwidth)
+ GST_M3U8 (g_list_first (demux->client->main->iframe_lists)->data)->
+ bandwidth)
return FALSE;
else if (!GST_M3U8 (previous_variant->data)->iframe && new_bandwidth ==
GST_M3U8 (g_list_first (demux->client->main->lists)->data)->bandwidth)
@@ -1369,6 +1442,7 @@ retry_failover_protection:
return TRUE;
}
+#if 0
static gboolean
gst_hls_demux_switch_playlist (GstHLSDemux * demux, GstFragment * fragment)
{
@@ -1411,7 +1485,9 @@ gst_hls_demux_switch_playlist (GstHLSDemux * demux, GstFragment * fragment)
return gst_hls_demux_change_playlist (demux, bitrate * demux->bitrate_limit);
}
+#endif
+#if 0
#ifdef HAVE_NETTLE
static gboolean
decrypt_fragment (GstHLSDemux * demux, gsize length,
@@ -1549,16 +1625,15 @@ decrypt_error:
g_object_unref (encrypted_fragment);
return ret;
}
+#endif
-static GstFragment *
+static gboolean
gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
gboolean * end_of_playlist, GError ** err)
{
- GstFragment *download;
const gchar *next_fragment_uri;
GstClockTime duration;
GstClockTime timestamp;
- GstBuffer *buf;
gboolean discont;
gint64 range_start, range_end;
const gchar *key = NULL;
@@ -1570,74 +1645,43 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
&key, &iv, demux->segment.rate > 0)) {
GST_INFO_OBJECT (demux, "This playlist doesn't contain more fragments");
*end_of_playlist = TRUE;
- return NULL;
+ return FALSE;
}
- GST_INFO_OBJECT (demux,
+ if (!gst_hls_demux_configure_src_pad (demux, NULL)) {
+ *end_of_playlist = FALSE;
+ return FALSE;
+ }
+
+ g_mutex_lock (&demux->fragment_download_lock);
+ GST_DEBUG_OBJECT (demux,
"Fetching next fragment %s (range=%" G_GINT64_FORMAT "-%" G_GINT64_FORMAT
")", next_fragment_uri, range_start, range_end);
- download =
- gst_uri_downloader_fetch_uri_with_range_and_referer (demux->downloader,
- next_fragment_uri, demux->client->main ? demux->client->main->uri : NULL,
- FALSE, range_start, range_end, err);
+ /* set up our source for download */
+ demux->current_timestamp = timestamp;
+ demux->starting_fragment = TRUE;
+ g_object_set (demux->src, "location", next_fragment_uri, NULL);
+ gst_element_set_state (demux->src, GST_STATE_READY); /* TODO check return */
+ gst_element_send_event (demux->src, gst_event_new_seek (1.0, GST_FORMAT_BYTES,
+ (GstSeekFlags) GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, range_start,
+ GST_SEEK_TYPE_SET, range_end));
- if (download == NULL)
- goto error;
+ gst_element_sync_state_with_parent (demux->src);
+ /* wait for the fragment to be completely downloaded */
+ g_cond_wait (&demux->fragment_download_cond, &demux->fragment_download_lock);
+
+ g_mutex_unlock (&demux->fragment_download_lock);
+ gst_element_set_state (demux->src, GST_STATE_NULL);
+
+#if 0
if (key) {
download = gst_hls_demux_decrypt_fragment (demux, download, key, iv, err);
if (download == NULL)
goto error;
}
+#endif
- buf = gst_fragment_get_buffer (download);
-
- GST_DEBUG_OBJECT (demux, "set fragment pts=%" GST_TIME_FORMAT " duration=%"
- GST_TIME_FORMAT, GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
-
- GST_BUFFER_DURATION (buf) = duration;
- GST_BUFFER_PTS (buf) = timestamp;
- GST_BUFFER_DTS (buf) = GST_CLOCK_TIME_NONE;
-
- /* We actually need to do this every time we switch bitrate */
- if (G_UNLIKELY (demux->do_typefind)) {
- GstCaps *caps = gst_fragment_get_caps (download);
-
- if (G_UNLIKELY (!caps)) {
- GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND,
- ("Could not determine type of stream"), (NULL));
- gst_buffer_unref (buf);
- g_object_unref (download);
- goto error;
- }
-
- if (!demux->input_caps || !gst_caps_is_equal (caps, demux->input_caps)) {
- gst_caps_replace (&demux->input_caps, caps);
- /* gst_pad_set_caps (demux->srcpad, demux->input_caps); */
- GST_INFO_OBJECT (demux, "Input source caps: %" GST_PTR_FORMAT,
- demux->input_caps);
- demux->do_typefind = FALSE;
- }
- gst_caps_unref (caps);
- } else {
- gst_fragment_set_caps (download, demux->input_caps);
- }
-
- if (discont) {
- GST_DEBUG_OBJECT (demux, "Marking fragment as discontinuous");
- GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
- } else {
- GST_BUFFER_FLAG_UNSET (buf, GST_BUFFER_FLAG_DISCONT);
- }
-
- /* The buffer ref is still kept inside the fragment download */
- gst_buffer_unref (buf);
-
- return download;
-
-error:
- {
- return NULL;
- }
+ return TRUE;
}
diff --git a/ext/hls/gsthlsdemux.h b/ext/hls/gsthlsdemux.h
index 57aa0a885..b7f2cdf37 100644
--- a/ext/hls/gsthlsdemux.h
+++ b/ext/hls/gsthlsdemux.h
@@ -53,7 +53,7 @@ typedef struct _GstHLSDemuxClass GstHLSDemuxClass;
*/
struct _GstHLSDemux
{
- GstElement parent;
+ GstBin parent;
GstPad *sinkpad;
GstPad *srcpad;
@@ -103,11 +103,18 @@ struct _GstHLSDemux
/* Current download rate (bps) */
gint current_download_rate;
+
+ /* fragment download tooling */
+ GstElement *src;
+ GMutex fragment_download_lock;
+ GCond fragment_download_cond;
+ GstClockTime current_timestamp;
+ gboolean starting_fragment;
};
struct _GstHLSDemuxClass
{
- GstElementClass parent_class;
+ GstBinClass parent_class;
};
GType gst_hls_demux_get_type (void);