adaptivedemux: use realtime_clock for waiting for a condition

There are several places in adaptivedemux where it waits for
time to pass, for example to wait until it should next download
a fragment. The problem with this approach is that it means that
unit tests are forced to execute in realtime.

This commit replaces the use of g_cond_wait_until() with single
shot GstClockID that signals the condition variable. Under normal
usage, this behaves exactly as before. A unit test can replace the
system clock with a GstTestClock, allowing the test to control the
timing in adaptivedemux.

https://bugzilla.gnome.org/show_bug.cgi?id=762147
This commit is contained in:
Florin Apostol 2016-02-16 14:44:27 +00:00 committed by Thiago Santos
parent 74d62b9144
commit aa58a70d66

View file

@ -201,6 +201,14 @@ struct _GstAdaptiveDemuxPrivate
GMutex segment_lock;
};
typedef struct _GstAdaptiveDemuxTimer
{
GCond *cond;
GMutex *mutex;
GstClockID clock_id;
gboolean fired;
} GstAdaptiveDemuxTimer;
static GstBinClass *parent_class = NULL;
static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
@ -271,6 +279,11 @@ gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
static GstFlowReturn
gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstClockTime duration);
static gboolean
gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
GstClockTime end_time);
static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
GstClockTime time, GstClockID id, gpointer user_data);
/* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
@ -2701,8 +2714,7 @@ static void
gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
{
GstAdaptiveDemux *demux = stream->demux;
guint64 next_download =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
GstFlowReturn ret;
gboolean live;
@ -2844,9 +2856,8 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
gint64 wait_time =
gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
if (wait_time > 0) {
gint64 end_time =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux))
+ wait_time / GST_USECOND;
GstClockTime end_time =
gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
GST_TIME_ARGS (wait_time));
@ -2860,8 +2871,9 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
stream->last_ret = GST_FLOW_FLUSHING;
goto cancelled;
}
g_cond_wait_until (&stream->fragment_download_cond,
&stream->fragment_download_lock, end_time);
gst_adaptive_demux_wait_until (demux->realtime_clock,
&stream->fragment_download_cond, &stream->fragment_download_lock,
end_time);
g_mutex_unlock (&stream->fragment_download_lock);
GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
@ -2880,8 +2892,7 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
stream->last_ret = GST_FLOW_OK;
next_download =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
next_download = gst_adaptive_demux_get_monotonic_time (demux);
ret = gst_adaptive_demux_stream_download_fragment (stream);
if (ret == GST_FLOW_FLUSHING) {
@ -2967,9 +2978,7 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
}
/* Wait half the fragment duration before retrying */
next_download +=
gst_util_uint64_scale
(stream->fragment.duration, G_USEC_PER_SEC, 2 * GST_SECOND);
next_download += stream->fragment.duration / 2;
GST_MANIFEST_UNLOCK (demux);
@ -2980,8 +2989,9 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
stream->last_ret = GST_FLOW_FLUSHING;
goto cancelled;
}
g_cond_wait_until (&stream->fragment_download_cond,
&stream->fragment_download_lock, next_download);
gst_adaptive_demux_wait_until (demux->realtime_clock,
&stream->fragment_download_cond, &stream->fragment_download_lock,
next_download);
g_mutex_unlock (&stream->fragment_download_lock);
GST_DEBUG_OBJECT (demux, "Retrying now");
@ -3062,7 +3072,7 @@ download_error:
static void
gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
{
gint64 next_update;
GstClockTime next_update;
GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
/* Loop for updating of the playlist. This periodically checks if
@ -3075,8 +3085,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
GST_MANIFEST_LOCK (demux);
next_update =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)) +
klass->get_manifest_update_interval (demux);
gst_adaptive_demux_get_monotonic_time (demux) +
klass->get_manifest_update_interval (demux) * GST_USECOND;
/* Updating playlist only needed for live playlists */
while (gst_adaptive_demux_is_live (demux)) {
@ -3092,7 +3102,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
g_mutex_unlock (&demux->priv->updates_timed_lock);
goto quit;
}
g_cond_wait_until (&demux->priv->updates_timed_cond,
gst_adaptive_demux_wait_until (demux->realtime_clock,
&demux->priv->updates_timed_cond,
&demux->priv->updates_timed_lock, next_update);
g_mutex_unlock (&demux->priv->updates_timed_lock);
@ -3116,9 +3127,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
demux->priv->update_failed_count++;
if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
GST_WARNING_OBJECT (demux, "Could not update the playlist");
next_update =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux))
+ klass->get_manifest_update_interval (demux);
next_update = gst_adaptive_demux_get_monotonic_time (demux)
+ klass->get_manifest_update_interval (demux) * GST_USECOND;
} else {
GST_ELEMENT_ERROR (demux, STREAM, FAILED,
(_("Internal data stream error.")), ("Could not update playlist"));
@ -3130,8 +3140,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
} else {
GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
next_update =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)) +
klass->get_manifest_update_interval (demux);
gst_adaptive_demux_get_monotonic_time (demux) +
klass->get_manifest_update_interval (demux) * GST_USECOND;
/* Wake up download tasks */
g_mutex_lock (&demux->priv->manifest_update_lock);
@ -3490,3 +3500,50 @@ gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
gtv.tv_usec = utc_now % G_TIME_SPAN_SECOND;
return g_date_time_new_from_timeval_utc (&gtv);
}
static gboolean
gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
GstClockTime end_time)
{
GstAdaptiveDemuxTimer timer;
GstClockReturn res;
if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
/* for an invalid time, gst_clock_id_wait_async will try to call
* gst_adaptive_demux_clock_callback from the current thread.
* It still holds the mutex while doing that, so it will deadlock.
* g_cond_wait_until would return immediately with false, so we'll do the same.
*/
return FALSE;
}
timer.fired = FALSE;
timer.cond = cond;
timer.mutex = mutex;
timer.clock_id = gst_clock_new_single_shot_id (clock, end_time);
res =
gst_clock_id_wait_async (timer.clock_id,
gst_adaptive_demux_clock_callback, &timer, NULL);
/* clock does not support asynchronously wait. Assert and return */
if (res == GST_CLOCK_UNSUPPORTED) {
gst_clock_id_unref (timer.clock_id);
g_return_val_if_reached (TRUE);
}
/* the gst_adaptive_demux_clock_callback will signal the
cond when the clock's single shot timer fires */
g_cond_wait (cond, mutex);
gst_clock_id_unref (timer.clock_id);
return !timer.fired;
}
static gboolean
gst_adaptive_demux_clock_callback (GstClock * clock,
GstClockTime time, GstClockID id, gpointer user_data)
{
GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
g_return_val_if_fail (timer != NULL, FALSE);
g_mutex_lock (timer->mutex);
timer->fired = TRUE;
g_cond_signal (timer->cond);
g_mutex_unlock (timer->mutex);
return TRUE;
}