From aa58a70d6676f9bc394780a90a39ff47d538fa68 Mon Sep 17 00:00:00 2001 From: Florin Apostol Date: Tue, 16 Feb 2016 14:44:27 +0000 Subject: adaptivedemux: use realtime_clock for waiting for a condition There are several places in adaptivedemux where it waits for time to pass, for example to wait until it should next download a fragment. The problem with this approach is that it means that unit tests are forced to execute in realtime. This commit replaces the use of g_cond_wait_until() with single shot GstClockID that signals the condition variable. Under normal usage, this behaves exactly as before. A unit test can replace the system clock with a GstTestClock, allowing the test to control the timing in adaptivedemux. https://bugzilla.gnome.org/show_bug.cgi?id=762147 --- gst-libs/gst/adaptivedemux/gstadaptivedemux.c | 103 ++++++++++++++++++++------ 1 file changed, 80 insertions(+), 23 deletions(-) (limited to 'gst-libs') diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c index df5ace396..9c2bb444a 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c @@ -201,6 +201,14 @@ struct _GstAdaptiveDemuxPrivate GMutex segment_lock; }; +typedef struct _GstAdaptiveDemuxTimer +{ + GCond *cond; + GMutex *mutex; + GstClockID clock_id; + gboolean fired; +} GstAdaptiveDemuxTimer; + static GstBinClass *parent_class = NULL; static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass); static void gst_adaptive_demux_init (GstAdaptiveDemux * dec, @@ -271,6 +279,11 @@ gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux, static GstFlowReturn gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstClockTime duration); +static gboolean +gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex, + GstClockTime end_time); +static gboolean gst_adaptive_demux_clock_callback (GstClock * clock, + GstClockTime time, GstClockID id, gpointer user_data); /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init @@ -2701,8 +2714,7 @@ static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream) { GstAdaptiveDemux *demux = stream->demux; - guint64 next_download = - GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)); + GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux); GstFlowReturn ret; gboolean live; @@ -2844,9 +2856,8 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream) gint64 wait_time = gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream); if (wait_time > 0) { - gint64 end_time = - GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)) - + wait_time / GST_USECOND; + GstClockTime end_time = + gst_adaptive_demux_get_monotonic_time (demux) + wait_time; GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT, GST_TIME_ARGS (wait_time)); @@ -2860,8 +2871,9 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream) stream->last_ret = GST_FLOW_FLUSHING; goto cancelled; } - g_cond_wait_until (&stream->fragment_download_cond, - &stream->fragment_download_lock, end_time); + gst_adaptive_demux_wait_until (demux->realtime_clock, + &stream->fragment_download_cond, &stream->fragment_download_lock, + end_time); g_mutex_unlock (&stream->fragment_download_lock); GST_DEBUG_OBJECT (stream->pad, "Download finished waiting"); @@ -2880,8 +2892,7 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream) stream->last_ret = GST_FLOW_OK; - next_download = - GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)); + next_download = gst_adaptive_demux_get_monotonic_time (demux); ret = gst_adaptive_demux_stream_download_fragment (stream); if (ret == GST_FLOW_FLUSHING) { @@ -2967,9 +2978,7 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream) } /* Wait half the fragment duration before retrying */ - next_download += - gst_util_uint64_scale - (stream->fragment.duration, G_USEC_PER_SEC, 2 * GST_SECOND); + next_download += stream->fragment.duration / 2; GST_MANIFEST_UNLOCK (demux); @@ -2980,8 +2989,9 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream) stream->last_ret = GST_FLOW_FLUSHING; goto cancelled; } - g_cond_wait_until (&stream->fragment_download_cond, - &stream->fragment_download_lock, next_download); + gst_adaptive_demux_wait_until (demux->realtime_clock, + &stream->fragment_download_cond, &stream->fragment_download_lock, + next_download); g_mutex_unlock (&stream->fragment_download_lock); GST_DEBUG_OBJECT (demux, "Retrying now"); @@ -3062,7 +3072,7 @@ download_error: static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux) { - gint64 next_update; + GstClockTime next_update; GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux); /* Loop for updating of the playlist. This periodically checks if @@ -3075,8 +3085,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux) GST_MANIFEST_LOCK (demux); next_update = - GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)) + - klass->get_manifest_update_interval (demux); + gst_adaptive_demux_get_monotonic_time (demux) + + klass->get_manifest_update_interval (demux) * GST_USECOND; /* Updating playlist only needed for live playlists */ while (gst_adaptive_demux_is_live (demux)) { @@ -3092,7 +3102,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux) g_mutex_unlock (&demux->priv->updates_timed_lock); goto quit; } - g_cond_wait_until (&demux->priv->updates_timed_cond, + gst_adaptive_demux_wait_until (demux->realtime_clock, + &demux->priv->updates_timed_cond, &demux->priv->updates_timed_lock, next_update); g_mutex_unlock (&demux->priv->updates_timed_lock); @@ -3116,9 +3127,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux) demux->priv->update_failed_count++; if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) { GST_WARNING_OBJECT (demux, "Could not update the playlist"); - next_update = - GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)) - + klass->get_manifest_update_interval (demux); + next_update = gst_adaptive_demux_get_monotonic_time (demux) + + klass->get_manifest_update_interval (demux) * GST_USECOND; } else { GST_ELEMENT_ERROR (demux, STREAM, FAILED, (_("Internal data stream error.")), ("Could not update playlist")); @@ -3130,8 +3140,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux) } else { GST_DEBUG_OBJECT (demux, "Updated playlist successfully"); next_update = - GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)) + - klass->get_manifest_update_interval (demux); + gst_adaptive_demux_get_monotonic_time (demux) + + klass->get_manifest_update_interval (demux) * GST_USECOND; /* Wake up download tasks */ g_mutex_lock (&demux->priv->manifest_update_lock); @@ -3490,3 +3500,50 @@ gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux) gtv.tv_usec = utc_now % G_TIME_SPAN_SECOND; return g_date_time_new_from_timeval_utc (>v); } + +static gboolean +gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex, + GstClockTime end_time) +{ + GstAdaptiveDemuxTimer timer; + GstClockReturn res; + + if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) { + /* for an invalid time, gst_clock_id_wait_async will try to call + * gst_adaptive_demux_clock_callback from the current thread. + * It still holds the mutex while doing that, so it will deadlock. + * g_cond_wait_until would return immediately with false, so we'll do the same. + */ + return FALSE; + } + timer.fired = FALSE; + timer.cond = cond; + timer.mutex = mutex; + timer.clock_id = gst_clock_new_single_shot_id (clock, end_time); + res = + gst_clock_id_wait_async (timer.clock_id, + gst_adaptive_demux_clock_callback, &timer, NULL); + /* clock does not support asynchronously wait. Assert and return */ + if (res == GST_CLOCK_UNSUPPORTED) { + gst_clock_id_unref (timer.clock_id); + g_return_val_if_reached (TRUE); + } + /* the gst_adaptive_demux_clock_callback will signal the + cond when the clock's single shot timer fires */ + g_cond_wait (cond, mutex); + gst_clock_id_unref (timer.clock_id); + return !timer.fired; +} + +static gboolean +gst_adaptive_demux_clock_callback (GstClock * clock, + GstClockTime time, GstClockID id, gpointer user_data) +{ + GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data; + g_return_val_if_fail (timer != NULL, FALSE); + g_mutex_lock (timer->mutex); + timer->fired = TRUE; + g_cond_signal (timer->cond); + g_mutex_unlock (timer->mutex); + return TRUE; +} -- cgit v1.2.1