summaryrefslogtreecommitdiff
path: root/ext/smoothstreaming
diff options
context:
space:
mode:
authorThiago Santos <thiago.sousa.santos@collabora.com>2013-01-14 13:21:10 -0300
committerThiago Santos <thiago.sousa.santos@collabora.com>2013-05-07 21:05:12 -0300
commitc2ae981e6d6cba1e5b7ca62837a2e2f00694892e (patch)
treee5edd52e00bc58c2bd619dfd7e37bbaa0c34c9a6 /ext/smoothstreaming
parentfba63178fe9e70eb5bd6bd80cce995541820245c (diff)
downloadgstreamer-plugins-bad-c2ae981e6d6cba1e5b7ca62837a2e2f00694892e.tar.gz
mssdemux: rewriting pad tasks so that buffers are pushed by ts order
Use pad tasks to download data and an extra task that gets the earlier buffer (with the smallest timestamp) and pushes on the corresponding pad. This prevents that the audio stream rushes ahead on buffers as its fragments should be smaller
Diffstat (limited to 'ext/smoothstreaming')
-rw-r--r--ext/smoothstreaming/gstmssdemux.c354
-rw-r--r--ext/smoothstreaming/gstmssdemux.h14
2 files changed, 314 insertions, 54 deletions
diff --git a/ext/smoothstreaming/gstmssdemux.c b/ext/smoothstreaming/gstmssdemux.c
index 8e4e263fe..a596726c6 100644
--- a/ext/smoothstreaming/gstmssdemux.c
+++ b/ext/smoothstreaming/gstmssdemux.c
@@ -85,7 +85,8 @@ static GstFlowReturn gst_mss_demux_event (GstPad * pad, GstEvent * event);
static gboolean gst_mss_demux_src_query (GstPad * pad, GstQuery * query);
-static void gst_mss_demux_stream_loop (GstMssDemuxStream * stream);
+static void gst_mss_demux_download_loop (GstMssDemuxStream * stream);
+static void gst_mss_demux_stream_loop (GstMssDemux * mssdemux);
static void gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
@@ -143,6 +144,23 @@ gst_mss_demux_init (GstMssDemux * mssdemux, GstMssDemuxClass * klass)
gst_pad_set_event_function (mssdemux->sinkpad,
GST_DEBUG_FUNCPTR (gst_mss_demux_event));
gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
+
+ g_static_rec_mutex_init (&mssdemux->stream_lock);
+ mssdemux->stream_task =
+ gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, mssdemux);
+ gst_task_set_lock (mssdemux->stream_task, &mssdemux->stream_lock);
+}
+
+static gboolean
+_data_queue_check_full (GstDataQueue * queue, guint visible, guint bytes,
+ guint64 time, gpointer checkdata)
+{
+ GstMssDemuxStream *stream = checkdata;
+ GstMssDemux *mssdemux = stream->parent;
+
+ if (mssdemux->data_queue_max_size == 0)
+ return FALSE; /* never full */
+ return visible >= mssdemux->data_queue_max_size;
}
static GstMssDemuxStream *
@@ -153,12 +171,13 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux,
stream = g_new0 (GstMssDemuxStream, 1);
stream->downloader = gst_uri_downloader_new ();
+ stream->dataqueue = gst_data_queue_new (_data_queue_check_full, stream);
- /* Streaming task */
- g_static_rec_mutex_init (&stream->stream_lock);
- stream->stream_task =
- gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, stream);
- gst_task_set_lock (stream->stream_task, &stream->stream_lock);
+ /* Downloading task */
+ g_static_rec_mutex_init (&stream->download_lock);
+ stream->download_task =
+ gst_task_create ((GstTaskFunction) gst_mss_demux_download_loop, stream);
+ gst_task_set_lock (stream->download_task, &stream->download_lock);
stream->pad = srcpad;
stream->manifest_stream = manifeststream;
@@ -170,20 +189,20 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux,
static void
gst_mss_demux_stream_free (GstMssDemuxStream * stream)
{
- if (stream->stream_task) {
- if (GST_TASK_STATE (stream->stream_task) != GST_TASK_STOPPED) {
+ if (stream->download_task) {
+ if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
GST_DEBUG_OBJECT (stream->parent, "Leaving streaming task %s:%s",
GST_DEBUG_PAD_NAME (stream->pad));
- gst_task_stop (stream->stream_task);
- g_static_rec_mutex_lock (&stream->stream_lock);
- g_static_rec_mutex_unlock (&stream->stream_lock);
+ gst_task_stop (stream->download_task);
+ g_static_rec_mutex_lock (&stream->download_lock);
+ g_static_rec_mutex_unlock (&stream->download_lock);
GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
- gst_task_join (stream->stream_task);
+ gst_task_join (stream->download_task);
GST_LOG_OBJECT (stream->parent, "Finished");
}
- gst_object_unref (stream->stream_task);
- g_static_rec_mutex_free (&stream->stream_lock);
- stream->stream_task = NULL;
+ gst_object_unref (stream->download_task);
+ g_static_rec_mutex_free (&stream->download_lock);
+ stream->download_task = NULL;
}
if (stream->pending_newsegment) {
@@ -196,6 +215,10 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream)
g_object_unref (stream->downloader);
stream->downloader = NULL;
}
+ if (stream->dataqueue) {
+ g_object_unref (stream->dataqueue);
+ stream->dataqueue = NULL;
+ }
if (stream->pad) {
gst_object_unref (stream->pad);
stream->pad = NULL;
@@ -207,6 +230,14 @@ static void
gst_mss_demux_reset (GstMssDemux * mssdemux)
{
GSList *iter;
+
+ if (GST_TASK_STATE (mssdemux->stream_task) != GST_TASK_STOPPED) {
+ gst_task_stop (mssdemux->stream_task);
+ g_static_rec_mutex_lock (&mssdemux->stream_lock);
+ g_static_rec_mutex_unlock (&mssdemux->stream_lock);
+ gst_task_join (mssdemux->stream_task);
+ }
+
if (mssdemux->manifest_buffer) {
gst_buffer_unref (mssdemux->manifest_buffer);
mssdemux->manifest_buffer = NULL;
@@ -233,7 +264,13 @@ gst_mss_demux_reset (GstMssDemux * mssdemux)
static void
gst_mss_demux_dispose (GObject * object)
{
- /* GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object); */
+ GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object);
+
+ if (mssdemux->stream_task) {
+ gst_object_unref (mssdemux->stream_task);
+ g_static_rec_mutex_free (&mssdemux->stream_lock);
+ mssdemux->stream_task = NULL;
+ }
G_OBJECT_CLASS (parent_class)->dispose (object);
}
@@ -325,8 +362,10 @@ gst_mss_demux_start (GstMssDemux * mssdemux)
GST_INFO_OBJECT (mssdemux, "Starting streams' tasks");
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
- gst_task_start (stream->stream_task);
+ gst_task_start (stream->download_task);
}
+
+ gst_task_start (mssdemux->stream_task);
}
static gboolean
@@ -378,17 +417,23 @@ static void
gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
{
GSList *iter;
+
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
+ gst_data_queue_set_flushing (stream->dataqueue, TRUE);
+
if (immediate)
gst_uri_downloader_cancel (stream->downloader);
- gst_task_pause (stream->stream_task);
+ gst_task_pause (stream->download_task);
}
+ gst_task_pause (mssdemux->stream_task);
+
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
- g_static_rec_mutex_lock (&stream->stream_lock);
+ g_static_rec_mutex_lock (&stream->download_lock);
}
+ g_static_rec_mutex_lock (&mssdemux->stream_lock);
}
static void
@@ -397,13 +442,16 @@ gst_mss_demux_restart_tasks (GstMssDemux * mssdemux)
GSList *iter;
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
- g_static_rec_mutex_unlock (&stream->stream_lock);
+ g_static_rec_mutex_unlock (&stream->download_lock);
}
+ g_static_rec_mutex_unlock (&mssdemux->stream_lock);
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
- gst_task_start (stream->stream_task);
+ gst_data_queue_set_flushing (stream->dataqueue, FALSE);
+ gst_task_start (stream->download_task);
}
+ gst_task_start (mssdemux->stream_task);
}
static gboolean
@@ -458,6 +506,8 @@ gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
+ stream->eos = FALSE;
+ gst_data_queue_flush (stream->dataqueue);
stream->pending_newsegment = gst_event_ref (newsegment);
}
gst_event_unref (newsegment);
@@ -727,7 +777,7 @@ gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
GSList *oldpads = NULL;
GSList *iter;
- gst_mss_demux_stop_tasks (mssdemux, FALSE);
+ gst_mss_demux_stop_tasks (mssdemux, TRUE);
if (gst_mss_manifest_change_bitrate (mssdemux->manifest,
mssdemux->connection_speed)) {
@@ -736,15 +786,46 @@ gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
GstPad *oldpad = stream->pad;
- GstClockTime ts =
- gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
+ GstClockTime ts = GST_CLOCK_TIME_NONE;
oldpads = g_slist_prepend (oldpads, oldpad);
+ /* since we are flushing the queue, get the next un-pushed timestamp to seek
+ * and avoid gaps */
+ gst_data_queue_set_flushing (stream->dataqueue, FALSE);
+ if (!gst_data_queue_is_empty (stream->dataqueue)) {
+ GstDataQueueItem *item = NULL;
+
+ while (!gst_data_queue_is_empty (stream->dataqueue)
+ && !GST_CLOCK_TIME_IS_VALID (ts)) {
+ gst_data_queue_pop (stream->dataqueue, &item);
+
+ if (!item) {
+ g_assert_not_reached ();
+ break;
+ }
+
+ if (GST_IS_BUFFER (item->object)) {
+ GstBuffer *buffer = GST_BUFFER_CAST (item->object);
+
+ ts = GST_BUFFER_TIMESTAMP (buffer);
+ }
+ item->destroy (item);
+ }
+
+ }
+ if (!GST_CLOCK_TIME_IS_VALID (ts)) {
+ ts = gst_mss_stream_get_fragment_gst_timestamp
+ (stream->manifest_stream);
+ }
+
+ GST_DEBUG_OBJECT (mssdemux,
+ "Seeking stream %p %s to ts %" GST_TIME_FORMAT, stream,
+ GST_PAD_NAME (stream->pad), GST_TIME_ARGS (ts));
+ gst_mss_stream_seek (stream->manifest_stream, ts);
+ gst_data_queue_flush (stream->dataqueue);
+
stream->pad = _create_pad (mssdemux, stream->manifest_stream);
- /* TODO keep the same playback rate */
- stream->pending_newsegment =
- gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, ts, -1, ts);
gst_mss_demux_expose_stream (mssdemux, stream);
gst_pad_push_event (oldpad, gst_event_new_eos ());
@@ -763,6 +844,37 @@ gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
gst_mss_demux_restart_tasks (mssdemux);
}
+static void
+_free_data_queue_item (gpointer obj)
+{
+ GstDataQueueItem *item = obj;
+
+ gst_mini_object_unref (item->object);
+ g_slice_free (GstDataQueueItem, item);
+}
+
+static void
+gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
+ GstMiniObject * obj)
+{
+ GstDataQueueItem *item;
+
+ item = g_slice_new (GstDataQueueItem);
+ item->object = (GstMiniObject *) obj;
+
+ item->duration = 0; /* we don't care */
+ item->size = 0;
+ item->visible = TRUE;
+
+ item->destroy = (GDestroyNotify) _free_data_queue_item;
+
+ if (!gst_data_queue_push (stream->dataqueue, item)) {
+ GST_DEBUG_OBJECT (stream->parent, "Failed to store object %p", obj);
+ gst_mini_object_unref (obj);
+ g_slice_free (GstDataQueueItem, item);
+ }
+}
+
static GstFlowReturn
gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
GstBuffer ** buffer)
@@ -811,7 +923,17 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
GST_BUFFER_DURATION (_buffer) =
gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
- *buffer = _buffer;
+ if (buffer)
+ *buffer = _buffer;
+
+ if (_buffer) {
+ GST_DEBUG_OBJECT (mssdemux,
+ "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT,
+ stream, GST_PAD_NAME (stream->pad),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)));
+ gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer));
+ }
+
return ret;
no_url_error:
@@ -819,24 +941,118 @@ no_url_error:
GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
(_("Failed to get fragment URL.")),
("An error happened when getting fragment URL"));
- gst_task_stop (stream->stream_task);
+ gst_task_stop (stream->download_task);
return GST_FLOW_ERROR;
}
error:
{
GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
- gst_task_stop (stream->stream_task);
+ gst_task_stop (stream->download_task);
return GST_FLOW_ERROR;
}
}
static void
-gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
+gst_mss_demux_download_loop (GstMssDemuxStream * stream)
{
GstMssDemux *mssdemux = stream->parent;
GstBuffer *buffer = NULL;
GstFlowReturn ret;
+ GST_LOG_OBJECT (mssdemux, "download loop start %p", stream);
+
+
+ ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
+ switch (ret) {
+ case GST_FLOW_OK:
+ break; /* all is good, let's go */
+ case GST_FLOW_UNEXPECTED: /* EOS */
+ goto eos;
+ case GST_FLOW_ERROR:
+ goto error;
+ default:
+ break;
+ }
+
+ g_assert (buffer != NULL);
+
+ gst_mss_stream_advance_fragment (stream->manifest_stream);
+ GST_LOG_OBJECT (mssdemux, "download loop end %p", stream);
+ return;
+
+eos:
+ {
+ GST_DEBUG_OBJECT (mssdemux, "Storing EOS for pad %s:%s",
+ GST_DEBUG_PAD_NAME (stream->pad));
+ gst_mss_demux_stream_store_object (stream,
+ GST_MINI_OBJECT_CAST (gst_event_new_eos ()));
+ gst_task_stop (stream->download_task);
+ return;
+ }
+error:
+ {
+ GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
+ gst_task_stop (stream->download_task);
+ return;
+ }
+}
+
+static GstFlowReturn
+gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux,
+ GstMssDemuxStream ** stream)
+{
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstMssDemuxStream *current = NULL;
+ GstClockTime cur_time = GST_CLOCK_TIME_NONE;
+ GSList *iter;
+
+ if (!mssdemux->streams)
+ return GST_FLOW_ERROR;
+
+ for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+ GstClockTime time;
+ GstMssDemuxStream *other;
+ GstDataQueueItem *item;
+
+ other = iter->data;
+ if (other->eos) {
+ continue;
+ }
+
+ if (gst_data_queue_peek (other->dataqueue, &item)) {
+ } else {
+ /* flushing */
+ return GST_FLOW_WRONG_STATE;
+ }
+
+ if (GST_IS_EVENT (item->object)) {
+ /* events have higher priority */
+ current = other;
+ break;
+ }
+ time = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (item->object));
+ if (time < cur_time) {
+ cur_time = time;
+ current = other;
+ }
+ }
+
+ *stream = current;
+ if (current == NULL)
+ ret = GST_FLOW_UNEXPECTED;
+ return ret;
+}
+
+static void
+gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
+{
+ GstMssDemuxStream *stream = NULL;
+ GstFlowReturn ret;
+ GstMiniObject *object = NULL;
+ GstDataQueueItem *item = NULL;
+
+ GST_LOG_OBJECT (mssdemux, "Starting stream loop");
+
GST_OBJECT_LOCK (mssdemux);
if (mssdemux->update_bitrates) {
mssdemux->update_bitrates = FALSE;
@@ -844,37 +1060,70 @@ gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
GST_DEBUG_OBJECT (mssdemux,
"Starting streams reconfiguration due to bitrate changes");
- g_thread_create ((GThreadFunc) gst_mss_demux_reconfigure, mssdemux, FALSE,
- NULL);
+ gst_mss_demux_reconfigure (mssdemux);
GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
- gst_task_stop (stream->stream_task);
- return;
} else {
GST_OBJECT_UNLOCK (mssdemux);
}
- ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
+ ret = gst_mss_demux_select_latest_stream (mssdemux, &stream);
+
+ if (stream)
+ GST_DEBUG_OBJECT (mssdemux,
+ "Stream loop selected %p stream of pad %s. %d - %s", stream,
+ GST_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
+ else
+ GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret,
+ gst_flow_get_name (ret));
+
switch (ret) {
case GST_FLOW_OK:
- break; /* all is good, let's go */
- case GST_FLOW_UNEXPECTED: /* EOS */
- goto eos;
+ break;
case GST_FLOW_ERROR:
goto error;
+ case GST_FLOW_UNEXPECTED:
+ goto eos;
+ case GST_FLOW_WRONG_STATE:
+ GST_DEBUG_OBJECT (mssdemux, "Wrong state, stopping task");
+ goto stop;
default:
- break;
+ g_assert_not_reached ();
}
- g_assert (buffer != NULL);
+ GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s",
+ stream, GST_PAD_NAME (stream->pad));
+ if (gst_data_queue_pop (stream->dataqueue, &item)) {
+ if (item->object)
+ object = gst_mini_object_ref (item->object);
+ item->destroy (item);
+ } else {
+ GST_DEBUG_OBJECT (mssdemux,
+ "Failed to get object from dataqueue on stream %p %s", stream,
+ GST_PAD_NAME (stream->pad));
+ goto stop;
+ }
if (G_UNLIKELY (stream->pending_newsegment)) {
gst_pad_push_event (stream->pad, stream->pending_newsegment);
stream->pending_newsegment = NULL;
}
- GST_DEBUG_OBJECT (mssdemux, "Pushing buffer of size %u on pad %s",
- GST_BUFFER_SIZE (buffer), GST_PAD_NAME (stream->pad));
- ret = gst_pad_push (stream->pad, buffer);
+ if (G_LIKELY (GST_IS_BUFFER (object))) {
+ GST_DEBUG_OBJECT (mssdemux,
+ "Pushing buffer %p %" GST_TIME_FORMAT " on pad %s", object,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
+ GST_PAD_NAME (stream->pad));
+ ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
+ } else if (GST_IS_EVENT (object)) {
+ if (GST_EVENT_TYPE (object) == GST_EVENT_EOS)
+ stream->eos = TRUE;
+ GST_DEBUG_OBJECT (mssdemux, "Pushing event %p on pad %s", object,
+ GST_PAD_NAME (stream->pad));
+ gst_pad_push_event (stream->pad, GST_EVENT_CAST (object));
+ } else {
+ g_return_if_reached ();
+ }
+
switch (ret) {
case GST_FLOW_UNEXPECTED:
goto eos; /* EOS ? */
@@ -887,22 +1136,25 @@ gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
break;
}
- gst_mss_stream_advance_fragment (stream->manifest_stream);
+ GST_LOG_OBJECT (mssdemux, "Stream loop end");
return;
eos:
{
- GstEvent *eos = gst_event_new_eos ();
- GST_DEBUG_OBJECT (mssdemux, "Pushing EOS on pad %s:%s",
- GST_DEBUG_PAD_NAME (stream->pad));
- gst_pad_push_event (stream->pad, eos);
- gst_task_stop (stream->stream_task);
+ GST_DEBUG_OBJECT (mssdemux, "EOS on all pads");
+ gst_task_stop (mssdemux->stream_task);
return;
}
error:
{
GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
- gst_task_stop (stream->stream_task);
+ gst_task_stop (mssdemux->stream_task);
+ return;
+ }
+stop:
+ {
+ GST_DEBUG_OBJECT (mssdemux, "Stopping streaming task");
+ gst_task_stop (mssdemux->stream_task);
return;
}
}
diff --git a/ext/smoothstreaming/gstmssdemux.h b/ext/smoothstreaming/gstmssdemux.h
index c279cc2b7..ceb471bda 100644
--- a/ext/smoothstreaming/gstmssdemux.h
+++ b/ext/smoothstreaming/gstmssdemux.h
@@ -25,6 +25,7 @@
#include <gst/gst.h>
#include <gst/base/gstadapter.h>
+#include <gst/base/gstdataqueue.h>
#include "gstmssmanifest.h"
#include "gsturidownloader.h"
@@ -58,13 +59,15 @@ struct _GstMssDemuxStream {
GstMssStream *manifest_stream;
GstUriDownloader *downloader;
+ GstDataQueue *dataqueue;
GstEvent *pending_newsegment;
- /* Streaming task */
- GstTask *stream_task;
- GStaticRecMutex stream_lock;
+ /* Downloading task */
+ GstTask *download_task;
+ GStaticRecMutex download_lock;
+ gboolean eos;
};
struct _GstMssDemux {
@@ -84,8 +87,13 @@ struct _GstMssDemux {
gboolean update_bitrates;
+ /* Streaming task */
+ GstTask *stream_task;
+ GStaticRecMutex stream_lock;
+
/* properties */
guint64 connection_speed; /* in bps */
+ guint data_queue_max_size;
};
struct _GstMssDemuxClass {