hlsdemux: Refactor threading and downloading

We now download fragments as fast as possible and push them downstream
while another thread is just responsible for updating live playlists
every now and then.

This simplifies the code a lot and together with the new buffering
mode for adaptive streams in multiqueue makes streams start much faster.

Also simplify threading a bit and hopefully make the GstTask usage safer.
This commit is contained in:
Sebastian Dröge 2014-02-17 09:19:32 +01:00
parent 76e74547c7
commit a51116add3
4 changed files with 264 additions and 317 deletions

View file

@ -4,6 +4,7 @@
* Copyright (C) 2011, Hewlett-Packard Development Company, L.P. * Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
* Author: Youness Alaoui <youness.alaoui@collabora.co.uk>, Collabora Ltd. * Author: Youness Alaoui <youness.alaoui@collabora.co.uk>, Collabora Ltd.
* Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd. * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
* Copyright (C) 2014 Sebastian Dröge <sebastian@centricular.com>
* *
* Gsthlsdemux.c: * Gsthlsdemux.c:
* *
@ -73,7 +74,7 @@ enum
PROP_LAST PROP_LAST
}; };
#define DEFAULT_FRAGMENTS_CACHE 3 #define DEFAULT_FRAGMENTS_CACHE 1
#define DEFAULT_FAILED_COUNT 3 #define DEFAULT_FAILED_COUNT 3
#define DEFAULT_BITRATE_LIMIT 0.8 #define DEFAULT_BITRATE_LIMIT 0.8
#define DEFAULT_CONNECTION_SPEED 0 #define DEFAULT_CONNECTION_SPEED 0
@ -101,12 +102,11 @@ static gboolean gst_hls_demux_src_query (GstPad * pad, GstObject * parent,
static void gst_hls_demux_stream_loop (GstHLSDemux * demux); static void gst_hls_demux_stream_loop (GstHLSDemux * demux);
static void gst_hls_demux_updates_loop (GstHLSDemux * demux); static void gst_hls_demux_updates_loop (GstHLSDemux * demux);
static void gst_hls_demux_stop (GstHLSDemux * demux); static void gst_hls_demux_stop (GstHLSDemux * demux);
static void gst_hls_demux_pause_tasks (GstHLSDemux * demux, gboolean caching); static void gst_hls_demux_pause_tasks (GstHLSDemux * demux);
static gboolean gst_hls_demux_cache_fragments (GstHLSDemux * demux); static gboolean gst_hls_demux_switch_playlist (GstHLSDemux * demux,
static gboolean gst_hls_demux_schedule (GstHLSDemux * demux); GstFragment * fragment);
static gboolean gst_hls_demux_switch_playlist (GstHLSDemux * demux); static GstFragment *gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
static gboolean gst_hls_demux_get_next_fragment (GstHLSDemux * demux, gboolean * end_of_playlist, GError ** err);
gboolean caching, GError ** err);
static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux, static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux,
gboolean update, GError ** err); gboolean update, GError ** err);
static void gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose); static void gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose);
@ -123,33 +123,13 @@ gst_hls_demux_dispose (GObject * obj)
GstHLSDemux *demux = GST_HLS_DEMUX (obj); GstHLSDemux *demux = GST_HLS_DEMUX (obj);
if (demux->stream_task) { if (demux->stream_task) {
if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
GST_DEBUG_OBJECT (demux, "Leaving streaming task");
gst_task_stop (demux->stream_task);
g_rec_mutex_lock (&demux->stream_lock);
g_rec_mutex_unlock (&demux->stream_lock);
gst_task_join (demux->stream_task);
}
gst_object_unref (demux->stream_task); gst_object_unref (demux->stream_task);
g_rec_mutex_clear (&demux->stream_lock); g_rec_mutex_clear (&demux->stream_lock);
demux->stream_task = NULL; demux->stream_task = NULL;
} }
if (demux->updates_task) { if (demux->updates_task) {
if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) {
GST_DEBUG_OBJECT (demux, "Leaving updates task");
demux->cancelled = TRUE;
gst_uri_downloader_cancel (demux->downloader);
gst_task_stop (demux->updates_task);
g_mutex_lock (&demux->updates_timed_lock);
GST_TASK_SIGNAL (demux->updates_task);
g_rec_mutex_lock (&demux->updates_lock);
g_rec_mutex_unlock (&demux->updates_lock);
g_mutex_unlock (&demux->updates_timed_lock);
gst_task_join (demux->updates_task);
}
gst_object_unref (demux->updates_task); gst_object_unref (demux->updates_task);
g_mutex_clear (&demux->updates_timed_lock);
g_rec_mutex_clear (&demux->updates_lock); g_rec_mutex_clear (&demux->updates_lock);
demux->updates_task = NULL; demux->updates_task = NULL;
} }
@ -161,7 +141,9 @@ gst_hls_demux_dispose (GObject * obj)
gst_hls_demux_reset (demux, TRUE); gst_hls_demux_reset (demux, TRUE);
g_queue_free (demux->queue); g_mutex_clear (&demux->download_lock);
g_cond_clear (&demux->download_cond);
g_cond_clear (&demux->updates_timed_cond);
G_OBJECT_CLASS (parent_class)->dispose (obj); G_OBJECT_CLASS (parent_class)->dispose (obj);
} }
@ -238,14 +220,15 @@ gst_hls_demux_init (GstHLSDemux * demux)
demux->bitrate_limit = DEFAULT_BITRATE_LIMIT; demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
demux->connection_speed = DEFAULT_CONNECTION_SPEED; demux->connection_speed = DEFAULT_CONNECTION_SPEED;
demux->queue = g_queue_new (); g_mutex_init (&demux->download_lock);
g_cond_init (&demux->download_cond);
g_cond_init (&demux->updates_timed_cond);
/* Updates task */ /* Updates task */
g_rec_mutex_init (&demux->updates_lock); g_rec_mutex_init (&demux->updates_lock);
demux->updates_task = demux->updates_task =
gst_task_new ((GstTaskFunction) gst_hls_demux_updates_loop, demux, NULL); gst_task_new ((GstTaskFunction) gst_hls_demux_updates_loop, demux, NULL);
gst_task_set_lock (demux->updates_task, &demux->updates_lock); gst_task_set_lock (demux->updates_task, &demux->updates_lock);
g_mutex_init (&demux->updates_timed_lock);
/* Streaming task */ /* Streaming task */
g_rec_mutex_init (&demux->stream_lock); g_rec_mutex_init (&demux->stream_lock);
@ -320,10 +303,9 @@ gst_hls_demux_change_state (GstElement * element, GstStateChange transition)
switch (transition) { switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY: case GST_STATE_CHANGE_PAUSED_TO_READY:
demux->cancelled = TRUE;
gst_hls_demux_stop (demux); gst_hls_demux_stop (demux);
gst_task_join (demux->stream_task);
gst_task_join (demux->updates_task); gst_task_join (demux->updates_task);
gst_task_join (demux->stream_task);
gst_hls_demux_reset (demux, FALSE); gst_hls_demux_reset (demux, FALSE);
break; break;
default: default:
@ -396,26 +378,13 @@ gst_hls_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
gst_pad_push_event (demux->srcpad, gst_event_new_flush_start ()); gst_pad_push_event (demux->srcpad, gst_event_new_flush_start ());
} }
demux->cancelled = TRUE; gst_hls_demux_pause_tasks (demux);
gst_task_pause (demux->stream_task);
gst_uri_downloader_cancel (demux->downloader);
gst_task_stop (demux->updates_task);
g_mutex_lock (&demux->updates_timed_lock);
GST_TASK_SIGNAL (demux->updates_task);
g_mutex_unlock (&demux->updates_timed_lock);
g_rec_mutex_lock (&demux->updates_lock);
g_rec_mutex_unlock (&demux->updates_lock);
gst_task_pause (demux->stream_task);
/* wait for streaming to finish */ /* wait for streaming to finish */
g_rec_mutex_lock (&demux->stream_lock); g_rec_mutex_lock (&demux->updates_lock);
g_rec_mutex_unlock (&demux->updates_lock);
demux->need_cache = TRUE; g_rec_mutex_lock (&demux->stream_lock);
while (!g_queue_is_empty (demux->queue)) {
GstFragment *fragment = g_queue_pop_head (demux->queue);
g_object_unref (fragment);
}
g_queue_clear (demux->queue);
GST_M3U8_CLIENT_LOCK (demux->client); GST_M3U8_CLIENT_LOCK (demux->client);
GST_DEBUG_OBJECT (demux, "seeking to sequence %d", current_sequence); GST_DEBUG_OBJECT (demux, "seeking to sequence %d", current_sequence);
@ -430,9 +399,11 @@ gst_hls_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
gst_pad_push_event (demux->srcpad, gst_event_new_flush_stop (TRUE)); gst_pad_push_event (demux->srcpad, gst_event_new_flush_stop (TRUE));
} }
demux->cancelled = FALSE; demux->stop_updates_task = FALSE;
gst_uri_downloader_reset (demux->downloader); gst_uri_downloader_reset (demux->downloader);
gst_task_start (demux->stream_task); demux->stop_stream_task = FALSE;
gst_task_start (demux->updates_task);
g_rec_mutex_unlock (&demux->stream_lock); g_rec_mutex_unlock (&demux->stream_lock);
return TRUE; return TRUE;
@ -497,7 +468,7 @@ gst_hls_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
return FALSE; return FALSE;
} }
gst_task_start (demux->stream_task); gst_task_start (demux->updates_task);
gst_event_unref (event); gst_event_unref (event);
return TRUE; return TRUE;
} }
@ -594,21 +565,22 @@ gst_hls_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
} }
static void static void
gst_hls_demux_pause_tasks (GstHLSDemux * demux, gboolean caching) gst_hls_demux_pause_tasks (GstHLSDemux * demux)
{ {
if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) { if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) {
demux->cancelled = TRUE; g_mutex_lock (&demux->updates_timed_lock);
demux->stop_updates_task = TRUE;
g_cond_signal (&demux->updates_timed_cond);
g_mutex_unlock (&demux->updates_timed_lock);
gst_uri_downloader_cancel (demux->downloader); gst_uri_downloader_cancel (demux->downloader);
gst_task_pause (demux->updates_task); gst_task_pause (demux->updates_task);
if (!caching)
g_mutex_lock (&demux->updates_timed_lock);
GST_TASK_SIGNAL (demux->updates_task);
if (!caching)
g_mutex_unlock (&demux->updates_timed_lock);
} }
if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) { if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
g_mutex_lock (&demux->download_lock);
demux->stop_stream_task = TRUE; demux->stop_stream_task = TRUE;
g_cond_signal (&demux->download_cond);
g_mutex_unlock (&demux->download_lock);
gst_task_pause (demux->stream_task); gst_task_pause (demux->stream_task);
} }
} }
@ -616,21 +588,22 @@ gst_hls_demux_pause_tasks (GstHLSDemux * demux, gboolean caching)
static void static void
gst_hls_demux_stop (GstHLSDemux * demux) gst_hls_demux_stop (GstHLSDemux * demux)
{ {
gst_uri_downloader_cancel (demux->downloader);
if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) { if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) {
demux->cancelled = TRUE; g_mutex_lock (&demux->updates_timed_lock);
demux->stop_updates_task = TRUE;
g_cond_signal (&demux->updates_timed_cond);
g_mutex_unlock (&demux->updates_timed_lock);
gst_uri_downloader_cancel (demux->downloader); gst_uri_downloader_cancel (demux->downloader);
gst_task_stop (demux->updates_task); gst_task_stop (demux->updates_task);
g_mutex_lock (&demux->updates_timed_lock);
GST_TASK_SIGNAL (demux->updates_task);
g_mutex_unlock (&demux->updates_timed_lock);
g_rec_mutex_lock (&demux->updates_lock); g_rec_mutex_lock (&demux->updates_lock);
g_rec_mutex_unlock (&demux->updates_lock); g_rec_mutex_unlock (&demux->updates_lock);
} }
if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) { if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
g_mutex_lock (&demux->download_lock);
demux->stop_stream_task = TRUE; demux->stop_stream_task = TRUE;
g_cond_signal (&demux->download_cond);
g_mutex_unlock (&demux->download_lock);
gst_task_stop (demux->stream_task); gst_task_stop (demux->stream_task);
g_rec_mutex_lock (&demux->stream_lock); g_rec_mutex_lock (&demux->stream_lock);
g_rec_mutex_unlock (&demux->stream_lock); g_rec_mutex_unlock (&demux->stream_lock);
@ -697,42 +670,110 @@ switch_pads (GstHLSDemux * demux, GstCaps * newcaps)
static void static void
gst_hls_demux_stream_loop (GstHLSDemux * demux) gst_hls_demux_stream_loop (GstHLSDemux * demux)
{ {
GstFragment *fragment = NULL; GstFragment *fragment;
GstBuffer *buf; GstBuffer *buf;
GstFlowReturn ret; GstFlowReturn ret;
GstCaps *bufcaps, *srccaps = NULL; GstCaps *bufcaps, *srccaps = NULL;
gboolean end_of_playlist;
GError *err = NULL;
/* Loop for the source pad task. The task is started when we have /* This task will download fragments as fast as possible, sends
* received the main playlist from the source element. It tries first to * SEGMENT and CAPS events and switches pads if necessary.
* cache the first fragments and then it waits until it has more data in the * If downloading a fragment fails we try again up to 3 times
* queue. This task is woken up when we push a new fragment to the queue or * after waiting a bit. If we're at the end of the playlist
* when we reached the end of the playlist */ * we wait for the playlist to update before getting the next
* fragment.
*/
GST_DEBUG_OBJECT (demux, "Enter task"); GST_DEBUG_OBJECT (demux, "Enter task");
if (G_UNLIKELY (demux->need_cache)) { if (demux->stop_stream_task)
if (!gst_hls_demux_cache_fragments (demux)) goto pause_task;
goto cache_error;
/* Pop off the first fragment immediately so the demux->next_download = g_get_monotonic_time ();
* update task can get the next one already */ if ((fragment =
fragment = g_queue_pop_head (demux->queue); gst_hls_demux_get_next_fragment (demux, &end_of_playlist,
&err)) == NULL) {
if (demux->stop_stream_task) {
g_clear_error (&err);
goto pause_task;
}
/* we can start now the updates thread (only if on playing) */ if (end_of_playlist) {
gst_task_start (demux->updates_task); if (!gst_m3u8_client_is_live (demux->client)) {
GST_INFO_OBJECT (demux, "First fragments cached successfully"); GST_DEBUG_OBJECT (demux, "End of playlist");
demux->end_of_playlist = TRUE;
goto end_of_playlist;
} else {
g_mutex_lock (&demux->download_lock);
/* Wait until we're cancelled or there's something for
* us to download in the playlist or the playlist
* became non-live */
while (TRUE) {
if (demux->stop_stream_task) {
g_mutex_unlock (&demux->download_lock);
goto pause_task;
}
/* Got a new fragment or not live anymore? */
if (gst_m3u8_client_get_next_fragment (demux->client, NULL, NULL,
NULL, NULL, NULL, NULL)
|| !gst_m3u8_client_is_live (demux->client))
break;
GST_DEBUG_OBJECT (demux,
"No fragment left but live playlist, wait a bit");
g_cond_wait (&demux->download_cond, &demux->download_lock);
}
g_mutex_unlock (&demux->download_lock);
GST_DEBUG_OBJECT (demux, "Retrying now");
return;
}
} else {
demux->download_failed_count++;
if (demux->download_failed_count < DEFAULT_FAILED_COUNT) {
GST_WARNING_OBJECT (demux, "Could not fetch the next fragment");
g_clear_error (&err);
/* Wait half the fragment duration before retrying */
demux->next_download +=
gst_util_uint64_scale (gst_m3u8_client_get_current_fragment_duration
(demux->client), G_USEC_PER_SEC, 2 * GST_SECOND);
g_mutex_lock (&demux->download_lock);
if (demux->stop_stream_task) {
g_mutex_unlock (&demux->download_lock);
goto pause_task;
}
g_cond_wait_until (&demux->download_cond, &demux->download_lock,
demux->next_download);
g_mutex_unlock (&demux->download_lock);
GST_DEBUG_OBJECT (demux, "Retrying now");
return;
} else {
gst_element_post_message (GST_ELEMENT_CAST (demux),
gst_message_new_error (GST_OBJECT_CAST (demux), err,
"Could not fetch the next fragment"));
g_clear_error (&err);
goto pause_task;
}
}
} else {
demux->download_failed_count = 0;
gst_m3u8_client_advance_fragment (demux->client);
if (demux->stop_updates_task) {
g_object_unref (fragment);
goto pause_task;
}
/* try to switch to another bitrate if needed */
gst_hls_demux_switch_playlist (demux, fragment);
} }
if (!fragment && g_queue_is_empty (demux->queue)) { if (demux->stop_updates_task) {
if (demux->end_of_playlist) g_object_unref (fragment);
goto end_of_playlist;
goto pause_task; goto pause_task;
} }
/* If we didn't get our fragment above already */
if (!fragment)
fragment = g_queue_pop_head (demux->queue);
/* Figure out if we need to create/switch pads */ /* Figure out if we need to create/switch pads */
if (G_LIKELY (demux->srcpad)) if (G_LIKELY (demux->srcpad))
srccaps = gst_pad_get_current_caps (demux->srcpad); srccaps = gst_pad_get_current_caps (demux->srcpad);
@ -771,7 +812,8 @@ gst_hls_demux_stream_loop (GstHLSDemux * demux)
demux->position_shift = 0; demux->position_shift = 0;
} }
GST_DEBUG_OBJECT (demux, "Pushing buffer %p", buf); GST_DEBUG_OBJECT (demux, "Pushing buffer %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
ret = gst_pad_push (demux->srcpad, buf); ret = gst_pad_push (demux->srcpad, buf);
if (ret != GST_FLOW_OK) if (ret != GST_FLOW_OK)
@ -785,20 +827,7 @@ end_of_playlist:
{ {
GST_DEBUG_OBJECT (demux, "Reached end of playlist, sending EOS"); GST_DEBUG_OBJECT (demux, "Reached end of playlist, sending EOS");
gst_pad_push_event (demux->srcpad, gst_event_new_eos ()); gst_pad_push_event (demux->srcpad, gst_event_new_eos ());
gst_hls_demux_pause_tasks (demux, FALSE); gst_hls_demux_pause_tasks (demux);
return;
}
cache_error:
{
/* Pausing a stopped task will start it */
if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED)
gst_task_pause (demux->stream_task);
if (!demux->cancelled) {
GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
("Could not cache the first fragments"), (NULL));
gst_hls_demux_pause_tasks (demux, FALSE);
}
return; return;
} }
@ -806,7 +835,7 @@ type_not_found:
{ {
GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND,
("Could not determine type of stream"), (NULL)); ("Could not determine type of stream"), (NULL));
gst_hls_demux_pause_tasks (demux, FALSE); gst_hls_demux_pause_tasks (demux);
return; return;
} }
@ -820,7 +849,7 @@ error_pushing:
GST_DEBUG_OBJECT (demux, "stream stopped, reason %s", GST_DEBUG_OBJECT (demux, "stream stopped, reason %s",
gst_flow_get_name (ret)); gst_flow_get_name (ret));
} }
gst_hls_demux_pause_tasks (demux, FALSE); gst_hls_demux_pause_tasks (demux);
return; return;
} }
@ -828,8 +857,7 @@ pause_task:
{ {
GST_DEBUG_OBJECT (demux, "Pause task"); GST_DEBUG_OBJECT (demux, "Pause task");
/* Pausing a stopped task will start it */ /* Pausing a stopped task will start it */
if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) gst_hls_demux_pause_tasks (demux);
gst_task_pause (demux->stream_task);
return; return;
} }
} }
@ -837,11 +865,12 @@ pause_task:
static void static void
gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose) gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose)
{ {
demux->need_cache = TRUE;
demux->end_of_playlist = FALSE; demux->end_of_playlist = FALSE;
demux->cancelled = FALSE; demux->stop_updates_task = FALSE;
demux->do_typefind = TRUE; demux->do_typefind = TRUE;
demux->download_failed_count = 0;
g_free (demux->key_url); g_free (demux->key_url);
demux->key_url = NULL; demux->key_url = NULL;
@ -868,12 +897,6 @@ gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose)
demux->client = gst_m3u8_client_new (""); demux->client = gst_m3u8_client_new ("");
} }
while (!g_queue_is_empty (demux->queue)) {
GstFragment *fragment = g_queue_pop_head (demux->queue);
g_object_unref (fragment);
}
g_queue_clear (demux->queue);
demux->position_shift = 0; demux->position_shift = 0;
demux->need_segment = TRUE; demux->need_segment = TRUE;
@ -900,123 +923,14 @@ gst_hls_demux_set_location (GstHLSDemux * demux, const gchar * uri)
void void
gst_hls_demux_updates_loop (GstHLSDemux * demux) gst_hls_demux_updates_loop (GstHLSDemux * demux)
{ {
/* Loop for the updates. It's started when the first fragments are cached and /* Loop for updating of the playlist. This periodically checks if
* schedules the next update of the playlist (for lives sources) and the next * the playlist is updated and does so, then signals the streaming
* update of fragments. When a new fragment is downloaded, it compares the * thread in case it can continue downloading now.
* download time with the next scheduled update to check if we can or should * For non-live playlists this thread is not doing much else than
* switch to a different bitrate */ * setting up the initial playlist and then stopping. */
/* block until the next scheduled update or the signal to quit this thread */ /* block until the next scheduled update or the signal to quit this thread */
g_mutex_lock (&demux->updates_timed_lock);
GST_DEBUG_OBJECT (demux, "Started updates task"); GST_DEBUG_OBJECT (demux, "Started updates task");
while (TRUE) {
if (demux->cancelled)
goto quit;
/* fetch the next fragment */
if (g_queue_get_length (demux->queue) < demux->fragments_cache) {
GError *err = NULL;
GST_DEBUG_OBJECT (demux, "queue not full, get next fragment");
if (!gst_hls_demux_get_next_fragment (demux, FALSE, &err)) {
if (demux->cancelled) {
g_clear_error (&err);
goto quit;
} else if (!demux->end_of_playlist) {
demux->client->update_failed_count++;
if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
GST_WARNING_OBJECT (demux, "Could not fetch the next fragment");
g_clear_error (&err);
continue;
} else {
gst_element_post_message (GST_ELEMENT_CAST (demux),
gst_message_new_error (GST_OBJECT_CAST (demux), err,
"Could not fetch the next fragment"));
g_clear_error (&err);
goto error;
}
}
} else {
demux->client->update_failed_count = 0;
if (demux->cancelled)
goto quit;
/* try to switch to another bitrate if needed */
gst_hls_demux_switch_playlist (demux);
}
}
/* schedule the next update */
gst_hls_demux_schedule (demux);
/* block until the next scheduled update or the signal to quit this thread */
GST_DEBUG_OBJECT (demux, "Waiting");
if (g_cond_wait_until (GST_TASK_GET_COND (demux->updates_task),
&demux->updates_timed_lock, demux->next_update)) {
GST_DEBUG_OBJECT (demux, "Unlocked");
goto quit;
}
GST_DEBUG_OBJECT (demux, "Continue");
if (demux->cancelled)
goto quit;
/* update the playlist for live sources */
if (gst_m3u8_client_is_live (demux->client)) {
GError *err = NULL;
if (!gst_hls_demux_update_playlist (demux, TRUE, &err)) {
if (demux->cancelled)
goto quit;
demux->client->update_failed_count++;
if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
GST_WARNING_OBJECT (demux, "Could not update the playlist");
continue;
} else {
gst_element_post_message (GST_ELEMENT_CAST (demux),
gst_message_new_error (GST_OBJECT_CAST (demux), err,
"Could not update the playlist"));
g_error_free (err);
goto error;
}
}
}
/* if it's a live source and the playlist couldn't be updated, there aren't
* more fragments in the playlist, so we just wait for the next schedulled
* update */
if (gst_m3u8_client_is_live (demux->client) &&
demux->client->update_failed_count > 0) {
GST_WARNING_OBJECT (demux,
"The playlist hasn't been updated, failed count is %d",
demux->client->update_failed_count);
continue;
}
if (demux->cancelled)
goto quit;
}
quit:
{
GST_DEBUG_OBJECT (demux, "Stopped updates task");
g_mutex_unlock (&demux->updates_timed_lock);
return;
}
error:
{
GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
gst_hls_demux_pause_tasks (demux, TRUE);
g_mutex_unlock (&demux->updates_timed_lock);
}
}
static gboolean
gst_hls_demux_cache_fragments (GstHLSDemux * demux)
{
gint i;
/* If this playlist is a variant playlist, select the first one /* If this playlist is a variant playlist, select the first one
* and update it */ * and update it */
@ -1025,7 +939,6 @@ gst_hls_demux_cache_fragments (GstHLSDemux * demux)
GError *err = NULL; GError *err = NULL;
if (demux->connection_speed == 0) { if (demux->connection_speed == 0) {
GST_M3U8_CLIENT_LOCK (demux->client); GST_M3U8_CLIENT_LOCK (demux->client);
child = demux->client->main->current_variant->data; child = demux->client->main->current_variant->data;
GST_M3U8_CLIENT_UNLOCK (demux->client); GST_M3U8_CLIENT_UNLOCK (demux->client);
@ -1042,7 +955,7 @@ gst_hls_demux_cache_fragments (GstHLSDemux * demux)
gst_message_new_error (GST_OBJECT_CAST (demux), err, gst_message_new_error (GST_OBJECT_CAST (demux), err,
"Could not fetch the child playlist")); "Could not fetch the child playlist"));
g_error_free (err); g_error_free (err);
return FALSE; goto error;
} }
} }
@ -1056,39 +969,76 @@ gst_hls_demux_cache_fragments (GstHLSDemux * demux)
gst_message_new_duration_changed (GST_OBJECT (demux))); gst_message_new_duration_changed (GST_OBJECT (demux)));
} }
/* Cache the first fragments */ /* Now start stream task */
for (i = 0; i < demux->fragments_cache; i++) { gst_task_start (demux->stream_task);
demux->next_update =
g_get_monotonic_time () +
gst_util_uint64_scale (gst_m3u8_client_get_target_duration
(demux->client), G_USEC_PER_SEC, GST_SECOND);
/* Updating playlist only needed for live playlists */
while (gst_m3u8_client_is_live (demux->client)) {
GError *err = NULL; GError *err = NULL;
gst_element_post_message (GST_ELEMENT (demux), /* Wait here until we should do the next update or we're cancelled */
gst_message_new_buffering (GST_OBJECT (demux), GST_DEBUG_OBJECT (demux, "Wait for next playlist update");
100 * i / demux->fragments_cache)); g_mutex_lock (&demux->updates_timed_lock);
demux->next_update = g_get_monotonic_time (); if (demux->stop_updates_task) {
if (!gst_hls_demux_get_next_fragment (demux, TRUE, &err)) { g_mutex_unlock (&demux->updates_timed_lock);
if (demux->end_of_playlist) goto quit;
break; }
if (!demux->cancelled) { g_cond_wait_until (&demux->updates_timed_cond, &demux->updates_timed_lock,
demux->next_update);
if (demux->stop_updates_task) {
g_mutex_unlock (&demux->updates_timed_lock);
goto quit;
}
g_mutex_unlock (&demux->updates_timed_lock);
GST_DEBUG_OBJECT (demux, "Updating playlist");
if (!gst_hls_demux_update_playlist (demux, TRUE, &err)) {
if (demux->stop_updates_task)
goto quit;
demux->client->update_failed_count++;
if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
GST_WARNING_OBJECT (demux, "Could not update the playlist");
demux->next_update =
g_get_monotonic_time () +
gst_util_uint64_scale (gst_m3u8_client_get_target_duration
(demux->client), G_USEC_PER_SEC, 2 * GST_SECOND);
} else {
gst_element_post_message (GST_ELEMENT_CAST (demux), gst_element_post_message (GST_ELEMENT_CAST (demux),
gst_message_new_error (GST_OBJECT_CAST (demux), err, gst_message_new_error (GST_OBJECT_CAST (demux), err,
"Error caching the first fragments")); "Could not update the playlist"));
g_error_free (err);
goto error;
} }
g_clear_error (&err); } else {
return FALSE; GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
demux->next_update =
g_get_monotonic_time () +
gst_util_uint64_scale (gst_m3u8_client_get_target_duration
(demux->client), G_USEC_PER_SEC, GST_SECOND);
/* Wake up download task */
g_mutex_lock (&demux->download_lock);
g_cond_signal (&demux->download_cond);
g_mutex_unlock (&demux->download_lock);
} }
/* make sure we stop caching fragments if something cancelled it */
if (demux->cancelled)
return FALSE;
gst_hls_demux_switch_playlist (demux);
} }
gst_element_post_message (GST_ELEMENT (demux),
gst_message_new_buffering (GST_OBJECT (demux), 100));
/* Start downloading 1s early to keep the risk of quit:
* underflows lower */ {
demux->next_update = g_get_monotonic_time () - G_USEC_PER_SEC; GST_DEBUG_OBJECT (demux, "Stopped updates task");
gst_task_pause (demux->updates_task);
return;
}
demux->need_cache = FALSE; error:
return TRUE; {
GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
gst_hls_demux_pause_tasks (demux);
}
} }
static gchar * static gchar *
@ -1245,45 +1195,14 @@ retry_failover_protection:
} }
static gboolean static gboolean
gst_hls_demux_schedule (GstHLSDemux * demux) gst_hls_demux_switch_playlist (GstHLSDemux * demux, GstFragment * fragment)
{
gfloat update_factor;
gint count;
/* As defined in §6.3.4. Reloading the Playlist file:
* "If the client reloads a Playlist file and finds that it has not
* changed then it MUST wait for a period of time before retrying. The
* minimum delay is a multiple of the target duration. This multiple is
* 0.5 for the first attempt, 1.5 for the second, and 3.0 thereafter."
*/
count = demux->client->update_failed_count;
if (count == 0)
update_factor = 1.0;
else
update_factor = 0.5;
/* schedule the next update using the target duration field of the
* playlist */
demux->next_update +=
gst_util_uint64_scale (gst_m3u8_client_get_current_fragment_duration
(demux->client), G_USEC_PER_SEC * update_factor, GST_SECOND);
GST_DEBUG_OBJECT (demux, "Next update scheduled at %" G_GINT64_FORMAT,
demux->next_update);
return TRUE;
}
static gboolean
gst_hls_demux_switch_playlist (GstHLSDemux * demux)
{ {
GstClockTime diff; GstClockTime diff;
gsize size; gsize size;
gint bitrate; gint bitrate;
GstFragment *fragment;
GstBuffer *buffer; GstBuffer *buffer;
GST_M3U8_CLIENT_LOCK (demux->client); GST_M3U8_CLIENT_LOCK (demux->client);
fragment = g_queue_peek_tail (demux->queue);
if (!demux->client->main->lists || !fragment) { if (!demux->client->main->lists || !fragment) {
GST_M3U8_CLIENT_UNLOCK (demux->client); GST_M3U8_CLIENT_UNLOCK (demux->client);
return TRUE; return TRUE;
@ -1292,13 +1211,13 @@ gst_hls_demux_switch_playlist (GstHLSDemux * demux)
/* compare the time when the fragment was downloaded with the time when it was /* compare the time when the fragment was downloaded with the time when it was
* scheduled */ * scheduled */
diff = g_get_monotonic_time () - demux->next_update; diff = g_get_monotonic_time () - demux->next_download;
buffer = gst_fragment_get_buffer (fragment); buffer = gst_fragment_get_buffer (fragment);
size = gst_buffer_get_size (buffer); size = gst_buffer_get_size (buffer);
bitrate = (size * 8) / ((double) diff / G_USEC_PER_SEC); bitrate = (size * 8) / ((double) diff / G_USEC_PER_SEC);
GST_DEBUG ("Downloaded %d bytes in %" GST_TIME_FORMAT ". Bitrate is : %d", GST_DEBUG ("Downloaded %d bytes in %" GST_TIME_FORMAT ". Bitrate is : %d",
(guint) size, GST_TIME_ARGS (diff), bitrate); (guint) size, GST_TIME_ARGS (diff * GST_USECOND), bitrate);
gst_buffer_unref (buffer); gst_buffer_unref (buffer);
return gst_hls_demux_change_playlist (demux, bitrate * demux->bitrate_limit); return gst_hls_demux_change_playlist (demux, bitrate * demux->bitrate_limit);
@ -1441,9 +1360,9 @@ decrypt_error:
return ret; return ret;
} }
static gboolean static GstFragment *
gst_hls_demux_get_next_fragment (GstHLSDemux * demux, gboolean caching, gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
GError ** err) gboolean * end_of_playlist, GError ** err)
{ {
GstFragment *download; GstFragment *download;
const gchar *next_fragment_uri; const gchar *next_fragment_uri;
@ -1454,12 +1373,12 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux, gboolean caching,
const gchar *key = NULL; const gchar *key = NULL;
const guint8 *iv = NULL; const guint8 *iv = NULL;
*end_of_playlist = FALSE;
if (!gst_m3u8_client_get_next_fragment (demux->client, &discont, if (!gst_m3u8_client_get_next_fragment (demux->client, &discont,
&next_fragment_uri, &duration, &timestamp, &key, &iv)) { &next_fragment_uri, &duration, &timestamp, &key, &iv)) {
GST_INFO_OBJECT (demux, "This playlist doesn't contain more fragments"); GST_INFO_OBJECT (demux, "This playlist doesn't contain more fragments");
demux->end_of_playlist = TRUE; *end_of_playlist = TRUE;
gst_task_start (demux->stream_task); return NULL;
return FALSE;
} }
GST_INFO_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri); GST_INFO_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri);
@ -1516,16 +1435,10 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux, gboolean caching,
/* The buffer ref is still kept inside the fragment download */ /* The buffer ref is still kept inside the fragment download */
gst_buffer_unref (buf); gst_buffer_unref (buf);
GST_DEBUG_OBJECT (demux, "Pushing fragment in queue"); return download;
g_queue_push_tail (demux->queue, download);
if (!caching) {
GST_TASK_SIGNAL (demux->updates_task);
gst_task_start (demux->stream_task);
}
return TRUE;
error: error:
{ {
return FALSE; return NULL;
} }
} }

View file

@ -66,9 +66,6 @@ struct _GstHLSDemux
GstCaps *input_caps; GstCaps *input_caps;
GstUriDownloader *downloader; GstUriDownloader *downloader;
GstM3U8Client *client; /* M3U8 client */ GstM3U8Client *client; /* M3U8 client */
GQueue *queue; /* Queue storing the fetched fragments */
gboolean need_cache; /* Wheter we need to cache some fragments before starting to push data */
gboolean end_of_playlist;
gboolean do_typefind; /* Whether we need to typefind the next buffer */ gboolean do_typefind; /* Whether we need to typefind the next buffer */
/* Properties */ /* Properties */
@ -80,13 +77,19 @@ struct _GstHLSDemux
GstTask *stream_task; GstTask *stream_task;
GRecMutex stream_lock; GRecMutex stream_lock;
gboolean stop_stream_task; gboolean stop_stream_task;
GMutex download_lock; /* Used for protecting queue and the two conds */
GCond download_cond; /* Signalled when something is added to the queue */
gboolean end_of_playlist;
gint download_failed_count;
gint64 next_download;
/* Updates task */ /* Updates task */
GstTask *updates_task; GstTask *updates_task;
GRecMutex updates_lock; GRecMutex updates_lock;
gint64 next_update; /* Time of the next update */
gboolean stop_updates_task;
GMutex updates_timed_lock; GMutex updates_timed_lock;
gint64 next_update; /* Time of the next update */ GCond updates_timed_cond; /* Signalled when the playlist should be updated */
gboolean cancelled;
/* Position in the stream */ /* Position in the stream */
GstClockTime position_shift; GstClockTime position_shift;

View file

@ -598,7 +598,6 @@ gst_m3u8_client_get_next_fragment (GstM3U8Client * client,
g_return_val_if_fail (client != NULL, FALSE); g_return_val_if_fail (client != NULL, FALSE);
g_return_val_if_fail (client->current != NULL, FALSE); g_return_val_if_fail (client->current != NULL, FALSE);
g_return_val_if_fail (discontinuity != NULL, FALSE);
GST_M3U8_CLIENT_LOCK (client); GST_M3U8_CLIENT_LOCK (client);
GST_DEBUG ("Looking for fragment %d", client->sequence); GST_DEBUG ("Looking for fragment %d", client->sequence);
@ -609,22 +608,53 @@ gst_m3u8_client_get_next_fragment (GstM3U8Client * client,
return FALSE; return FALSE;
} }
gst_m3u8_client_get_current_position (client, timestamp);
file = GST_M3U8_MEDIA_FILE (l->data); file = GST_M3U8_MEDIA_FILE (l->data);
GST_DEBUG ("Got fragment with sequence %u (client sequence %u)",
file->sequence, client->sequence);
*discontinuity = client->sequence != file->sequence; if (timestamp)
client->sequence = file->sequence + 1; gst_m3u8_client_get_current_position (client, timestamp);
*uri = file->uri; if (discontinuity)
*duration = file->duration; *discontinuity = client->sequence != file->sequence;
*key = file->key; if (uri)
*iv = file->iv; *uri = file->uri;
if (duration)
*duration = file->duration;
if (key)
*key = file->key;
if (iv)
*iv = file->iv;
GST_M3U8_CLIENT_UNLOCK (client); GST_M3U8_CLIENT_UNLOCK (client);
return TRUE; return TRUE;
} }
void
gst_m3u8_client_advance_fragment (GstM3U8Client * client)
{
GList *l;
GstM3U8MediaFile *file;
g_return_if_fail (client != NULL);
g_return_if_fail (client->current != NULL);
GST_M3U8_CLIENT_LOCK (client);
GST_DEBUG ("Looking for fragment %d", client->sequence);
l = g_list_find_custom (client->current->files, client,
(GCompareFunc) _find_next);
if (l == NULL) {
GST_ERROR ("Could not find current fragment");
GST_M3U8_CLIENT_UNLOCK (client);
return;
}
file = GST_M3U8_MEDIA_FILE (l->data);
GST_DEBUG ("Advancing from sequence %u", file->sequence);
client->sequence = file->sequence + 1;
GST_M3U8_CLIENT_UNLOCK (client);
}
static void static void
_sum_duration (GstM3U8MediaFile * self, GstClockTime * duration) _sum_duration (GstM3U8MediaFile * self, GstClockTime * duration)
{ {

View file

@ -87,6 +87,7 @@ void gst_m3u8_client_set_current (GstM3U8Client * client, GstM3U8 * m3u8);
gboolean gst_m3u8_client_get_next_fragment (GstM3U8Client * client, gboolean gst_m3u8_client_get_next_fragment (GstM3U8Client * client,
gboolean * discontinuity, const gchar ** uri, GstClockTime * duration, gboolean * discontinuity, const gchar ** uri, GstClockTime * duration,
GstClockTime * timestamp, const gchar ** key, const guint8 ** iv); GstClockTime * timestamp, const gchar ** key, const guint8 ** iv);
void gst_m3u8_client_advance_fragment (GstM3U8Client * client);
void gst_m3u8_client_get_current_position (GstM3U8Client * client, void gst_m3u8_client_get_current_position (GstM3U8Client * client,
GstClockTime * timestamp); GstClockTime * timestamp);
GstClockTime gst_m3u8_client_get_duration (GstM3U8Client * client); GstClockTime gst_m3u8_client_get_duration (GstM3U8Client * client);