adaptivedemux: use GstSystemClock to all real-time calculations

A realtime clock is used in many places, such as deciding which
fragment to select at start up and deciding how long to sleep
before a fragment becomes available. For example dashdemux needs
sample the client's estimate of UTC when selecting where to start
in a live DASH stream.

The problem with dashdemux calculating the client's idea of UTC is
that it makes it difficult to create unit tests, because the passage
of time is a factor in the test.

This commit changes dashdemux and adaptivedemux to use the
GstSystemClock, so that a unit test can replace the system clock when
it needs to be able to control the clock.

This commit makes no change to the behaviour under normal usage, as
GstSystemClock is based upon the system time.

https://bugzilla.gnome.org/show_bug.cgi?id=762147
This commit is contained in:
Florin Apostol 2016-02-16 14:44:10 +00:00 committed by Thiago Santos
parent b66a9acdb8
commit 74d62b9144
3 changed files with 117 additions and 23 deletions

View file

@ -266,7 +266,8 @@ static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux,
GstActiveStream * stream);
static GstPad *gst_dash_demux_create_pad (GstDashDemux * demux,
GstActiveStream * stream);
static GstDashDemuxClockDrift *gst_dash_demux_clock_drift_new (void);
static GstDashDemuxClockDrift *gst_dash_demux_clock_drift_new (GstDashDemux *
demux);
static void gst_dash_demux_clock_drift_free (GstDashDemuxClockDrift *);
static gboolean gst_dash_demux_poll_clock_drift (GstDashDemux * demux);
static GTimeSpan gst_dash_demux_get_clock_compensation (GstDashDemux * demux);
@ -711,7 +712,7 @@ gst_dash_demux_setup_streams (GstAdaptiveDemux * demux)
SUPPORTED_CLOCK_FORMATS, NULL);
if (urls) {
GST_DEBUG_OBJECT (dashdemux, "Found a supported UTCTiming element");
dashdemux->clock_drift = gst_dash_demux_clock_drift_new ();
dashdemux->clock_drift = gst_dash_demux_clock_drift_new (dashdemux);
gst_dash_demux_poll_clock_drift (dashdemux);
}
}
@ -1540,8 +1541,12 @@ gst_dash_demux_stream_get_fragment_waiting_time (GstAdaptiveDemuxStream *
gint64 diff;
GstDateTime *cur_time;
cur_time = gst_date_time_new_now_utc ();
diff = gst_mpd_client_calculate_time_difference (cur_time,
cur_time =
gst_date_time_new_from_g_date_time
(gst_adaptive_demux_get_client_now_utc (GST_ADAPTIVE_DEMUX_CAST
(dashdemux)));
diff =
gst_mpd_client_calculate_time_difference (cur_time,
segmentAvailability);
gst_date_time_unref (segmentAvailability);
gst_date_time_unref (cur_time);
@ -1725,13 +1730,15 @@ gst_dash_demux_stream_free (GstAdaptiveDemuxStream * stream)
}
static GstDashDemuxClockDrift *
gst_dash_demux_clock_drift_new (void)
gst_dash_demux_clock_drift_new (GstDashDemux * demux)
{
GstDashDemuxClockDrift *clock_drift;
clock_drift = g_slice_new0 (GstDashDemuxClockDrift);
g_mutex_init (&clock_drift->clock_lock);
clock_drift->next_update = g_get_monotonic_time ();
clock_drift->next_update =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time
(GST_ADAPTIVE_DEMUX_CAST (demux)));
return clock_drift;
}
@ -2020,7 +2027,9 @@ gst_dash_demux_poll_clock_drift (GstDashDemux * demux)
g_return_val_if_fail (demux != NULL, FALSE);
g_return_val_if_fail (demux->clock_drift != NULL, FALSE);
clock_drift = demux->clock_drift;
now = g_get_monotonic_time ();
now =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time
(GST_ADAPTIVE_DEMUX_CAST (demux)));
if (now < clock_drift->next_update) {
/*TODO: If a fragment fails to download in adaptivedemux, it waits
for a manifest reload before another attempt to fetch a fragment.
@ -2050,7 +2059,8 @@ gst_dash_demux_poll_clock_drift (GstDashDemux * demux)
goto quit;
}
}
start = g_date_time_new_now_utc ();
start =
gst_adaptive_demux_get_client_now_utc (GST_ADAPTIVE_DEMUX_CAST (demux));
if (!value) {
GstFragment *download;
gint64 range_start = 0, range_end = -1;
@ -2078,7 +2088,7 @@ gst_dash_demux_poll_clock_drift (GstDashDemux * demux)
urls[clock_drift->selected_url]);
goto quit;
}
end = g_date_time_new_now_utc ();
end = gst_adaptive_demux_get_client_now_utc (GST_ADAPTIVE_DEMUX_CAST (demux));
if (!value && method == GST_MPD_UTCTIMING_TYPE_HTTP_NTP) {
value = gst_dash_demux_parse_http_ntp (clock_drift, buffer);
} else if (!value) {
@ -2155,8 +2165,13 @@ gst_dash_demux_get_clock_compensation (GstDashDemux * demux)
static GDateTime *
gst_dash_demux_get_server_now_utc (GstDashDemux * demux)
{
GDateTime *client_now = g_date_time_new_now_utc ();
GDateTime *server_now = g_date_time_add (client_now,
GDateTime *client_now;
GDateTime *server_now;
client_now =
gst_adaptive_demux_get_client_now_utc (GST_ADAPTIVE_DEMUX_CAST (demux));
server_now =
g_date_time_add (client_now,
gst_dash_demux_get_clock_compensation (demux));
g_date_time_unref (client_now);
return server_now;

View file

@ -400,6 +400,8 @@ gst_adaptive_demux_init (GstAdaptiveDemux * demux,
GstAdaptiveDemuxClass * klass)
{
GstPadTemplate *pad_template;
GstClockType clock_type = GST_CLOCK_TYPE_OTHER;
GObjectClass *gobject_class;
GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
@ -413,6 +415,30 @@ gst_adaptive_demux_init (GstAdaptiveDemux * demux,
gst_segment_init (&demux->segment, GST_FORMAT_TIME);
demux->realtime_clock = gst_system_clock_obtain ();
g_assert (demux->realtime_clock != NULL);
gobject_class = G_OBJECT_GET_CLASS (demux->realtime_clock);
if (g_object_class_find_property (gobject_class, "clock-type")) {
g_object_get (demux->realtime_clock, "clock-type", &clock_type, NULL);
} else {
GST_WARNING_OBJECT (demux,
"System clock does not have clock-type property");
}
if (clock_type == GST_CLOCK_TYPE_REALTIME) {
demux->clock_offset = 0;
} else {
GDateTime *utc_now;
GstClockTime rtc_now;
GTimeVal gtv;
utc_now = g_date_time_new_now_utc ();
rtc_now = gst_clock_get_time (demux->realtime_clock);
g_date_time_to_timeval (utc_now, &gtv);
demux->clock_offset =
gtv.tv_sec * G_TIME_SPAN_SECOND + gtv.tv_usec -
GST_TIME_AS_USECONDS (rtc_now);
g_date_time_unref (utc_now);
}
g_rec_mutex_init (&demux->priv->updates_lock);
demux->priv->updates_task =
gst_task_new ((GstTaskFunction) gst_adaptive_demux_updates_loop,
@ -466,6 +492,10 @@ gst_adaptive_demux_finalize (GObject * object)
g_rec_mutex_clear (&demux->priv->manifest_lock);
g_mutex_clear (&demux->priv->api_lock);
g_mutex_clear (&demux->priv->segment_lock);
if (demux->realtime_clock) {
gst_object_unref (demux->realtime_clock);
demux->realtime_clock = NULL;
}
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -2070,7 +2100,8 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
}
stream->download_total_time +=
g_get_monotonic_time () - stream->download_chunk_start_time;
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)) -
stream->download_chunk_start_time;
stream->download_total_bytes += gst_buffer_get_size (buffer);
gst_adapter_push (stream->adapter, buffer);
@ -2091,7 +2122,8 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
g_mutex_unlock (&stream->fragment_download_lock);
}
stream->download_chunk_start_time = g_get_monotonic_time ();
stream->download_chunk_start_time =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
if (ret != GST_FLOW_OK) {
if (ret < GST_FLOW_EOS) {
@ -2463,8 +2495,9 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
}
if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
stream->download_start_time = g_get_monotonic_time ();
stream->download_chunk_start_time = g_get_monotonic_time ();
stream->download_start_time =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
stream->download_chunk_start_time = stream->download_start_time;
/* src element is in state READY. Before we start it, we reset
* download_finished
@ -2668,7 +2701,8 @@ static void
gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
{
GstAdaptiveDemux *demux = stream->demux;
guint64 next_download = g_get_monotonic_time ();
guint64 next_download =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
GstFlowReturn ret;
gboolean live;
@ -2810,7 +2844,9 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
gint64 wait_time =
gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
if (wait_time > 0) {
gint64 end_time = g_get_monotonic_time () + wait_time / GST_USECOND;
gint64 end_time =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux))
+ wait_time / GST_USECOND;
GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
GST_TIME_ARGS (wait_time));
@ -2844,7 +2880,8 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
stream->last_ret = GST_FLOW_OK;
next_download = g_get_monotonic_time ();
next_download =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
ret = gst_adaptive_demux_stream_download_fragment (stream);
if (ret == GST_FLOW_FLUSHING) {
@ -3037,7 +3074,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
GST_MANIFEST_LOCK (demux);
next_update = g_get_monotonic_time () +
next_update =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)) +
klass->get_manifest_update_interval (demux);
/* Updating playlist only needed for live playlists */
@ -3078,8 +3116,9 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
demux->priv->update_failed_count++;
if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
GST_WARNING_OBJECT (demux, "Could not update the playlist");
next_update = g_get_monotonic_time () +
klass->get_manifest_update_interval (demux);
next_update =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux))
+ klass->get_manifest_update_interval (demux);
} else {
GST_ELEMENT_ERROR (demux, STREAM, FAILED,
(_("Internal data stream error.")), ("Could not update playlist"));
@ -3090,7 +3129,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
}
} else {
GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
next_update = g_get_monotonic_time () +
next_update =
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)) +
klass->get_manifest_update_interval (demux);
/* Wake up download tasks */
@ -3238,7 +3278,7 @@ gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
}
stream->download_start_time = stream->download_chunk_start_time =
g_get_monotonic_time ();
GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
if (ret == GST_FLOW_OK) {
if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
@ -3418,3 +3458,35 @@ gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
gst_adaptive_demux_expose_streams (demux, FALSE);
gst_adaptive_demux_start_tasks (demux);
}
/**
* gst_adaptive_demux_get_monotonic_time:
* Returns: a monotonically increasing time, using the system realtime clock
*/
GstClockTime
gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux)
{
g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
return gst_clock_get_time (demux->realtime_clock);
}
/**
* gst_adaptive_demux_get_client_now_utc:
* @demux: #GstAdaptiveDemux
* Returns: the client's estimate of UTC
*
* Used to find the client's estimate of UTC, using the system realtime clock.
*/
GDateTime *
gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
{
GstClockTime rtc_now;
gint64 utc_now;
GTimeVal gtv;
rtc_now = gst_clock_get_time (demux->realtime_clock);
utc_now = demux->clock_offset + GST_TIME_AS_USECONDS (rtc_now);
gtv.tv_sec = utc_now / G_TIME_SPAN_SECOND;
gtv.tv_usec = utc_now % G_TIME_SPAN_SECOND;
return g_date_time_new_from_timeval_utc (&gtv);
}

View file

@ -205,6 +205,10 @@ struct _GstAdaptiveDemux
gboolean have_group_id;
guint group_id;
/* Realtime clock */
GstClock *realtime_clock;
gint64 clock_offset; /* offset between realtime_clock and UTC (in usec) */
/* < private > */
GstAdaptiveDemuxPrivate *priv;
};
@ -450,6 +454,9 @@ gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
void gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream,
GstEvent * event);
GstClockTime gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux);
GDateTime *gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux);
G_END_DECLS
#endif