summaryrefslogtreecommitdiff
path: root/gst-libs
diff options
context:
space:
mode:
authorFlorin Apostol <florin.apostol@oregan.net>2016-02-16 14:44:27 +0000
committerThiago Santos <thiagoss@osg.samsung.com>2016-04-21 16:46:09 -0300
commitaa58a70d6676f9bc394780a90a39ff47d538fa68 (patch)
treed6c0a84efd5195a0e2f355aae8b5a225e7ef7176 /gst-libs
parent74d62b91449b2f21d417bc67b58792217c185b8d (diff)
downloadgstreamer-plugins-bad-aa58a70d6676f9bc394780a90a39ff47d538fa68.tar.gz
adaptivedemux: use realtime_clock for waiting for a condition
There are several places in adaptivedemux where it waits for time to pass, for example to wait until it should next download a fragment. The problem with this approach is that it means that unit tests are forced to execute in realtime. This commit replaces the use of g_cond_wait_until() with single shot GstClockID that signals the condition variable. Under normal usage, this behaves exactly as before. A unit test can replace the system clock with a GstTestClock, allowing the test to control the timing in adaptivedemux. https://bugzilla.gnome.org/show_bug.cgi?id=762147
Diffstat (limited to 'gst-libs')
-rw-r--r--gst-libs/gst/adaptivedemux/gstadaptivedemux.c103
1 files changed, 80 insertions, 23 deletions
diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c
index df5ace396..9c2bb444a 100644
--- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c
+++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c
@@ -201,6 +201,14 @@ struct _GstAdaptiveDemuxPrivate
GMutex segment_lock;
};
+typedef struct _GstAdaptiveDemuxTimer
+{
+ GCond *cond;
+ GMutex *mutex;
+ GstClockID clock_id;
+ gboolean fired;
+} GstAdaptiveDemuxTimer;
+
static GstBinClass *parent_class = NULL;
static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
@@ -271,6 +279,11 @@ gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
static GstFlowReturn
gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstClockTime duration);
+static gboolean
+gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
+ GstClockTime end_time);
+static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
+ GstClockTime time, GstClockID id, gpointer user_data);
/* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
@@ -2701,8 +2714,7 @@ static void
gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
{
GstAdaptiveDemux *demux = stream->demux;
- guint64 next_download =
- GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
+ GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
GstFlowReturn ret;
gboolean live;
@@ -2844,9 +2856,8 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
gint64 wait_time =
gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
if (wait_time > 0) {
- gint64 end_time =
- GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux))
- + wait_time / GST_USECOND;
+ GstClockTime end_time =
+ gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
GST_TIME_ARGS (wait_time));
@@ -2860,8 +2871,9 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
stream->last_ret = GST_FLOW_FLUSHING;
goto cancelled;
}
- g_cond_wait_until (&stream->fragment_download_cond,
- &stream->fragment_download_lock, end_time);
+ gst_adaptive_demux_wait_until (demux->realtime_clock,
+ &stream->fragment_download_cond, &stream->fragment_download_lock,
+ end_time);
g_mutex_unlock (&stream->fragment_download_lock);
GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
@@ -2880,8 +2892,7 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
stream->last_ret = GST_FLOW_OK;
- next_download =
- GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
+ next_download = gst_adaptive_demux_get_monotonic_time (demux);
ret = gst_adaptive_demux_stream_download_fragment (stream);
if (ret == GST_FLOW_FLUSHING) {
@@ -2967,9 +2978,7 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
}
/* Wait half the fragment duration before retrying */
- next_download +=
- gst_util_uint64_scale
- (stream->fragment.duration, G_USEC_PER_SEC, 2 * GST_SECOND);
+ next_download += stream->fragment.duration / 2;
GST_MANIFEST_UNLOCK (demux);
@@ -2980,8 +2989,9 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
stream->last_ret = GST_FLOW_FLUSHING;
goto cancelled;
}
- g_cond_wait_until (&stream->fragment_download_cond,
- &stream->fragment_download_lock, next_download);
+ gst_adaptive_demux_wait_until (demux->realtime_clock,
+ &stream->fragment_download_cond, &stream->fragment_download_lock,
+ next_download);
g_mutex_unlock (&stream->fragment_download_lock);
GST_DEBUG_OBJECT (demux, "Retrying now");
@@ -3062,7 +3072,7 @@ download_error:
static void
gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
{
- gint64 next_update;
+ GstClockTime next_update;
GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
/* Loop for updating of the playlist. This periodically checks if
@@ -3075,8 +3085,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
GST_MANIFEST_LOCK (demux);
next_update =
- GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)) +
- klass->get_manifest_update_interval (demux);
+ gst_adaptive_demux_get_monotonic_time (demux) +
+ klass->get_manifest_update_interval (demux) * GST_USECOND;
/* Updating playlist only needed for live playlists */
while (gst_adaptive_demux_is_live (demux)) {
@@ -3092,7 +3102,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
g_mutex_unlock (&demux->priv->updates_timed_lock);
goto quit;
}
- g_cond_wait_until (&demux->priv->updates_timed_cond,
+ gst_adaptive_demux_wait_until (demux->realtime_clock,
+ &demux->priv->updates_timed_cond,
&demux->priv->updates_timed_lock, next_update);
g_mutex_unlock (&demux->priv->updates_timed_lock);
@@ -3116,9 +3127,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
demux->priv->update_failed_count++;
if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
GST_WARNING_OBJECT (demux, "Could not update the playlist");
- next_update =
- GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux))
- + klass->get_manifest_update_interval (demux);
+ next_update = gst_adaptive_demux_get_monotonic_time (demux)
+ + klass->get_manifest_update_interval (demux) * GST_USECOND;
} else {
GST_ELEMENT_ERROR (demux, STREAM, FAILED,
(_("Internal data stream error.")), ("Could not update playlist"));
@@ -3130,8 +3140,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
} else {
GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
next_update =
- GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)) +
- klass->get_manifest_update_interval (demux);
+ gst_adaptive_demux_get_monotonic_time (demux) +
+ klass->get_manifest_update_interval (demux) * GST_USECOND;
/* Wake up download tasks */
g_mutex_lock (&demux->priv->manifest_update_lock);
@@ -3490,3 +3500,50 @@ gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
gtv.tv_usec = utc_now % G_TIME_SPAN_SECOND;
return g_date_time_new_from_timeval_utc (&gtv);
}
+
+static gboolean
+gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
+ GstClockTime end_time)
+{
+ GstAdaptiveDemuxTimer timer;
+ GstClockReturn res;
+
+ if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
+ /* for an invalid time, gst_clock_id_wait_async will try to call
+ * gst_adaptive_demux_clock_callback from the current thread.
+ * It still holds the mutex while doing that, so it will deadlock.
+ * g_cond_wait_until would return immediately with false, so we'll do the same.
+ */
+ return FALSE;
+ }
+ timer.fired = FALSE;
+ timer.cond = cond;
+ timer.mutex = mutex;
+ timer.clock_id = gst_clock_new_single_shot_id (clock, end_time);
+ res =
+ gst_clock_id_wait_async (timer.clock_id,
+ gst_adaptive_demux_clock_callback, &timer, NULL);
+ /* clock does not support asynchronously wait. Assert and return */
+ if (res == GST_CLOCK_UNSUPPORTED) {
+ gst_clock_id_unref (timer.clock_id);
+ g_return_val_if_reached (TRUE);
+ }
+ /* the gst_adaptive_demux_clock_callback will signal the
+ cond when the clock's single shot timer fires */
+ g_cond_wait (cond, mutex);
+ gst_clock_id_unref (timer.clock_id);
+ return !timer.fired;
+}
+
+static gboolean
+gst_adaptive_demux_clock_callback (GstClock * clock,
+ GstClockTime time, GstClockID id, gpointer user_data)
+{
+ GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
+ g_return_val_if_fail (timer != NULL, FALSE);
+ g_mutex_lock (timer->mutex);
+ timer->fired = TRUE;
+ g_cond_signal (timer->cond);
+ g_mutex_unlock (timer->mutex);
+ return TRUE;
+}