hlsdemux: replace uridownloader with a GstElement

The GstElement is directly linked into a ghost pad and
its buffers are pushed as received downstream. This way the
buffers are small enough and not a whole fragment that usually
causes extra latency and makes buffering harder
This commit is contained in:
Thiago Santos 2014-04-07 13:57:26 -03:00
parent eee4f95a1f
commit 3611759557
2 changed files with 204 additions and 153 deletions

View file

@ -47,6 +47,7 @@
#else
#include <gcrypt.h>
#endif
#include <gst/base/gsttypefindhelper.h>
#include "gsthlsdemux.h"
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
@ -101,9 +102,11 @@ static void gst_hls_demux_stream_loop (GstHLSDemux * demux);
static void gst_hls_demux_updates_loop (GstHLSDemux * demux);
static void gst_hls_demux_stop (GstHLSDemux * demux);
static void gst_hls_demux_pause_tasks (GstHLSDemux * demux);
#if 0
static gboolean gst_hls_demux_switch_playlist (GstHLSDemux * demux,
GstFragment * fragment);
static GstFragment *gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
#endif
static gboolean gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
gboolean * end_of_playlist, GError ** err);
static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux,
gboolean update, GError ** err);
@ -116,7 +119,7 @@ static gboolean gst_hls_demux_change_playlist (GstHLSDemux * demux,
guint max_bitrate);
#define gst_hls_demux_parent_class parent_class
G_DEFINE_TYPE (GstHLSDemux, gst_hls_demux, GST_TYPE_ELEMENT);
G_DEFINE_TYPE (GstHLSDemux, gst_hls_demux, GST_TYPE_BIN);
static void
gst_hls_demux_dispose (GObject * obj)
@ -146,6 +149,8 @@ gst_hls_demux_dispose (GObject * obj)
g_cond_clear (&demux->download_cond);
g_mutex_clear (&demux->updates_timed_lock);
g_cond_clear (&demux->updates_timed_cond);
g_mutex_clear (&demux->fragment_download_lock);
g_cond_clear (&demux->fragment_download_cond);
G_OBJECT_CLASS (parent_class)->dispose (obj);
}
@ -226,6 +231,8 @@ gst_hls_demux_init (GstHLSDemux * demux)
g_cond_init (&demux->download_cond);
g_mutex_init (&demux->updates_timed_lock);
g_cond_init (&demux->updates_timed_cond);
g_mutex_init (&demux->fragment_download_lock);
g_cond_init (&demux->fragment_download_cond);
/* Updates task */
g_rec_mutex_init (&demux->updates_lock);
@ -238,6 +245,9 @@ gst_hls_demux_init (GstHLSDemux * demux)
demux->stream_task =
gst_task_new ((GstTaskFunction) gst_hls_demux_stream_loop, demux, NULL);
gst_task_set_lock (demux->stream_task, &demux->stream_lock);
demux->src = gst_element_factory_make ("souphttpsrc", "hls-download-src");
gst_element_set_locked_state (demux->src, TRUE);
gst_bin_add (GST_BIN_CAST (demux), demux->src);
demux->have_group_id = FALSE;
demux->group_id = G_MAXUINT;
@ -305,6 +315,7 @@ gst_hls_demux_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_element_set_state (demux->src, GST_STATE_NULL);
gst_hls_demux_stop (demux);
gst_task_join (demux->updates_task);
gst_task_join (demux->stream_task);
@ -687,12 +698,129 @@ gst_hls_demux_stop (GstHLSDemux * demux)
demux->stop_stream_task = TRUE;
g_cond_signal (&demux->download_cond);
g_mutex_unlock (&demux->download_lock);
g_mutex_lock (&demux->fragment_download_lock);
g_cond_signal (&demux->fragment_download_cond);
g_mutex_unlock (&demux->fragment_download_lock);
gst_task_stop (demux->stream_task);
g_rec_mutex_lock (&demux->stream_lock);
g_rec_mutex_unlock (&demux->stream_lock);
}
}
static GstFlowReturn
_src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstPad *srcpad = (GstPad *) parent;
GstHLSDemux *demux = (GstHLSDemux *) GST_PAD_PARENT (srcpad);
GstFlowReturn ret;
GstCaps *caps;
/* We actually need to do this every time we switch bitrate */
if (G_UNLIKELY (demux->do_typefind)) {
caps = gst_type_find_helper_for_buffer (NULL, buffer, NULL);
if (G_UNLIKELY (!caps)) {
GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND,
("Could not determine type of stream"), (NULL));
gst_buffer_unref (buffer);
return GST_FLOW_NOT_NEGOTIATED;
}
if (!demux->input_caps || !gst_caps_is_equal (caps, demux->input_caps)) {
gst_caps_replace (&demux->input_caps, caps);
GST_INFO_OBJECT (demux, "Input source caps: %" GST_PTR_FORMAT,
demux->input_caps);
demux->do_typefind = FALSE;
}
gst_pad_set_caps (srcpad, caps);
gst_caps_unref (caps);
}
if (demux->discont) {
GST_DEBUG_OBJECT (demux, "Marking fragment as discontinuous");
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
demux->discont = FALSE;
} else if (demux->starting_fragment && demux->segment.rate < 0) {
/* Set DISCONT flag for every buffer in reverse playback mode
* as each fragment for its own has to be reversed */
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
} else {
GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
}
demux->starting_fragment = FALSE;
GST_DEBUG_OBJECT (demux, "set fragment pts=%" GST_TIME_FORMAT,
GST_TIME_ARGS (demux->current_timestamp));
GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
GST_BUFFER_PTS (buffer) = demux->current_timestamp;
demux->segment.position = GST_BUFFER_TIMESTAMP (buffer);
#if 0
if (demux->segment.rate > 0)
demux->segment.position += GST_BUFFER_DURATION (buf);
#endif
if (demux->need_segment) {
/* And send a newsegment */
GST_DEBUG_OBJECT (demux, "Sending segment event: %"
GST_SEGMENT_FORMAT, &demux->segment);
gst_pad_push_event (demux->srcpad, gst_event_new_segment (&demux->segment));
demux->need_segment = FALSE;
}
ret = gst_proxy_pad_chain_default (pad, parent, buffer);
if (ret != GST_FLOW_OK) {
if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) {
GST_ELEMENT_ERROR (demux, STREAM, FAILED, (NULL),
("stream stopped, reason %s", gst_flow_get_name (ret)));
gst_pad_push_event (demux->srcpad, gst_event_new_eos ());
} else {
GST_DEBUG_OBJECT (demux, "stream stopped, reason %s",
gst_flow_get_name (ret));
}
gst_hls_demux_pause_tasks (demux);
}
/* avoid having the source handle the same error again */
ret = GST_FLOW_OK;
return ret;
}
static gboolean
_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstPad *srcpad = GST_PAD_CAST (parent);
GstHLSDemux *demux = (GstHLSDemux *) GST_PAD_PARENT (srcpad);;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
g_cond_signal (&demux->fragment_download_cond);
break;
default:
break;
}
return TRUE;
}
static gboolean
_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_ALLOCATION:
return FALSE;
break;
default:
break;
}
return gst_pad_query_default (pad, parent, query);
}
static void
switch_pads (GstHLSDemux * demux, GstCaps * newcaps)
{
@ -700,14 +828,35 @@ switch_pads (GstHLSDemux * demux, GstCaps * newcaps)
GstEvent *event;
gchar *stream_id;
gchar *name;
GstPad *target;
GstPadTemplate *tmpl;
GstProxyPad *internal_pad;
GST_DEBUG ("Switching pads (oldpad:%p) with caps: %" GST_PTR_FORMAT, oldpad,
GST_DEBUG_OBJECT (demux,
"Switching pads (oldpad:%p) with caps: %" GST_PTR_FORMAT, oldpad,
newcaps);
target = gst_element_get_static_pad (demux->src, "src");
/* First create and activate new pad */
name = g_strdup_printf ("src_%u", demux->srcpad_counter++);
demux->srcpad = gst_pad_new_from_static_template (&srctemplate, name);
tmpl = gst_static_pad_template_get (&srctemplate);
demux->srcpad = gst_ghost_pad_new_from_template (name, target, tmpl);
gst_object_unref (tmpl);
g_free (name);
gst_object_unref (target);
/* set up our internal pad to drop all events from
* the http src we don't care about. On the chain function
* we just push the buffer forward, but this way hls can get
* the flow return from downstream */
internal_pad = gst_proxy_pad_get_internal (GST_PROXY_PAD (demux->srcpad));
gst_pad_set_chain_function (GST_PAD_CAST (internal_pad), _src_chain);
gst_pad_set_event_function (GST_PAD_CAST (internal_pad), _src_event);
/* need to set query otherwise deadlocks happen with allocation queries */
gst_pad_set_query_function (GST_PAD_CAST (internal_pad), _src_query);
gst_object_unref (internal_pad);
gst_pad_set_event_function (demux->srcpad,
GST_DEBUG_FUNCPTR (gst_hls_demux_src_event));
gst_pad_set_query_function (demux->srcpad,
@ -752,54 +901,28 @@ switch_pads (GstHLSDemux * demux, GstCaps * newcaps)
}
static gboolean
gst_hls_demux_configure_src_pad (GstHLSDemux * demux, GstFragment * fragment)
gst_hls_demux_configure_src_pad (GstHLSDemux * demux, GstCaps * bufcaps)
{
GstCaps *bufcaps = NULL, *srccaps = NULL;
GstBuffer *buf = NULL;
GstCaps *srccaps = NULL;
/* Figure out if we need to create/switch pads */
if (G_LIKELY (demux->srcpad))
srccaps = gst_pad_get_current_caps (demux->srcpad);
if (fragment) {
bufcaps = gst_fragment_get_caps (fragment);
if (G_UNLIKELY (!bufcaps)) {
if (srccaps)
gst_caps_unref (srccaps);
return FALSE;
}
buf = gst_fragment_get_buffer (fragment);
}
if (G_UNLIKELY (!srccaps || demux->discont || (buf
&& GST_BUFFER_IS_DISCONT (buf)))) {
if (G_UNLIKELY (!srccaps || demux->discont)) {
switch_pads (demux, bufcaps);
demux->need_segment = TRUE;
demux->discont = FALSE;
if (buf)
GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
}
if (bufcaps)
gst_caps_unref (bufcaps);
if (G_LIKELY (srccaps))
gst_caps_unref (srccaps);
if (demux->need_segment) {
/* And send a newsegment */
GST_DEBUG_OBJECT (demux, "Sending segment event: %"
GST_SEGMENT_FORMAT, &demux->segment);
gst_pad_push_event (demux->srcpad, gst_event_new_segment (&demux->segment));
demux->need_segment = FALSE;
}
if (buf)
gst_buffer_unref (buf);
return TRUE;
}
static void
gst_hls_demux_stream_loop (GstHLSDemux * demux)
{
GstFragment *fragment;
GstBuffer *buf;
GstFlowReturn ret;
gboolean end_of_playlist;
GError *err = NULL;
@ -827,9 +950,7 @@ gst_hls_demux_stream_loop (GstHLSDemux * demux)
}
demux->next_download = g_get_monotonic_time ();
if ((fragment =
gst_hls_demux_get_next_fragment (demux, &end_of_playlist,
&err)) == NULL) {
if (!gst_hls_demux_get_next_fragment (demux, &end_of_playlist, &err)) {
if (demux->stop_stream_task) {
g_clear_error (&err);
goto pause_task;
@ -914,45 +1035,20 @@ gst_hls_demux_stream_loop (GstHLSDemux * demux)
gst_m3u8_client_advance_fragment (demux->client, demux->segment.rate > 0);
if (demux->stop_updates_task) {
g_object_unref (fragment);
goto pause_task;
}
}
if (demux->stop_updates_task) {
g_object_unref (fragment);
goto pause_task;
}
if (!gst_hls_demux_configure_src_pad (demux, fragment)) {
g_object_unref (fragment);
goto type_not_found;
}
buf = gst_fragment_get_buffer (fragment);
GST_DEBUG_OBJECT (demux, "Pushing buffer %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
/* Set DISCONT flag for every buffer in reverse playback mode
* as each fragment for its own has to be reversed */
if (demux->segment.rate < 0) {
GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
}
demux->segment.position = GST_BUFFER_TIMESTAMP (buf);
if (demux->segment.rate > 0)
demux->segment.position += GST_BUFFER_DURATION (buf);
ret = gst_pad_push (demux->srcpad, buf);
if (ret != GST_FLOW_OK)
goto error_pushing;
#if 0
/* try to switch to another bitrate if needed */
gst_hls_demux_switch_playlist (demux, fragment);
g_object_unref (fragment);
#endif
GST_DEBUG_OBJECT (demux, "Pushed buffer");
GST_DEBUG_OBJECT (demux, "Finished pushing fragment");
return;
@ -967,29 +1063,6 @@ end_of_playlist:
return;
}
type_not_found:
{
GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND,
("Could not determine type of stream"), (NULL));
gst_hls_demux_pause_tasks (demux);
return;
}
error_pushing:
{
g_object_unref (fragment);
if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) {
GST_ELEMENT_ERROR (demux, STREAM, FAILED, (NULL),
("stream stopped, reason %s", gst_flow_get_name (ret)));
gst_pad_push_event (demux->srcpad, gst_event_new_eos ());
} else {
GST_DEBUG_OBJECT (demux, "stream stopped, reason %s",
gst_flow_get_name (ret));
}
gst_hls_demux_pause_tasks (demux);
return;
}
pause_task:
{
GST_DEBUG_OBJECT (demux, "Pause task");
@ -1252,8 +1325,8 @@ gst_hls_demux_update_playlist (GstHLSDemux * demux, gboolean update,
GST_M3U8_CLIENT_LOCK (demux->client);
last_sequence =
GST_M3U8_MEDIA_FILE (g_list_last (demux->client->current->files)->
data)->sequence;
GST_M3U8_MEDIA_FILE (g_list_last (demux->client->current->
files)->data)->sequence;
if (demux->client->sequence >= last_sequence - 3) {
GST_DEBUG_OBJECT (demux, "Sequence is beyond playlist. Moving back to %u",
@ -1353,8 +1426,8 @@ retry_failover_protection:
gst_m3u8_client_set_current (demux->client, previous_variant->data);
/* Try a lower bitrate (or stop if we just tried the lowest) */
if (GST_M3U8 (previous_variant->data)->iframe && new_bandwidth ==
GST_M3U8 (g_list_first (demux->client->main->iframe_lists)->
data)->bandwidth)
GST_M3U8 (g_list_first (demux->client->main->iframe_lists)->data)->
bandwidth)
return FALSE;
else if (!GST_M3U8 (previous_variant->data)->iframe && new_bandwidth ==
GST_M3U8 (g_list_first (demux->client->main->lists)->data)->bandwidth)
@ -1369,6 +1442,7 @@ retry_failover_protection:
return TRUE;
}
#if 0
static gboolean
gst_hls_demux_switch_playlist (GstHLSDemux * demux, GstFragment * fragment)
{
@ -1411,7 +1485,9 @@ gst_hls_demux_switch_playlist (GstHLSDemux * demux, GstFragment * fragment)
return gst_hls_demux_change_playlist (demux, bitrate * demux->bitrate_limit);
}
#endif
#if 0
#ifdef HAVE_NETTLE
static gboolean
decrypt_fragment (GstHLSDemux * demux, gsize length,
@ -1549,16 +1625,15 @@ decrypt_error:
g_object_unref (encrypted_fragment);
return ret;
}
#endif
static GstFragment *
static gboolean
gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
gboolean * end_of_playlist, GError ** err)
{
GstFragment *download;
const gchar *next_fragment_uri;
GstClockTime duration;
GstClockTime timestamp;
GstBuffer *buf;
gboolean discont;
gint64 range_start, range_end;
const gchar *key = NULL;
@ -1570,74 +1645,43 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
&key, &iv, demux->segment.rate > 0)) {
GST_INFO_OBJECT (demux, "This playlist doesn't contain more fragments");
*end_of_playlist = TRUE;
return NULL;
return FALSE;
}
GST_INFO_OBJECT (demux,
if (!gst_hls_demux_configure_src_pad (demux, NULL)) {
*end_of_playlist = FALSE;
return FALSE;
}
g_mutex_lock (&demux->fragment_download_lock);
GST_DEBUG_OBJECT (demux,
"Fetching next fragment %s (range=%" G_GINT64_FORMAT "-%" G_GINT64_FORMAT
")", next_fragment_uri, range_start, range_end);
download =
gst_uri_downloader_fetch_uri_with_range_and_referer (demux->downloader,
next_fragment_uri, demux->client->main ? demux->client->main->uri : NULL,
FALSE, range_start, range_end, err);
/* set up our source for download */
demux->current_timestamp = timestamp;
demux->starting_fragment = TRUE;
g_object_set (demux->src, "location", next_fragment_uri, NULL);
gst_element_set_state (demux->src, GST_STATE_READY); /* TODO check return */
gst_element_send_event (demux->src, gst_event_new_seek (1.0, GST_FORMAT_BYTES,
(GstSeekFlags) GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, range_start,
GST_SEEK_TYPE_SET, range_end));
if (download == NULL)
goto error;
gst_element_sync_state_with_parent (demux->src);
/* wait for the fragment to be completely downloaded */
g_cond_wait (&demux->fragment_download_cond, &demux->fragment_download_lock);
g_mutex_unlock (&demux->fragment_download_lock);
gst_element_set_state (demux->src, GST_STATE_NULL);
#if 0
if (key) {
download = gst_hls_demux_decrypt_fragment (demux, download, key, iv, err);
if (download == NULL)
goto error;
}
#endif
buf = gst_fragment_get_buffer (download);
GST_DEBUG_OBJECT (demux, "set fragment pts=%" GST_TIME_FORMAT " duration=%"
GST_TIME_FORMAT, GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
GST_BUFFER_DURATION (buf) = duration;
GST_BUFFER_PTS (buf) = timestamp;
GST_BUFFER_DTS (buf) = GST_CLOCK_TIME_NONE;
/* We actually need to do this every time we switch bitrate */
if (G_UNLIKELY (demux->do_typefind)) {
GstCaps *caps = gst_fragment_get_caps (download);
if (G_UNLIKELY (!caps)) {
GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND,
("Could not determine type of stream"), (NULL));
gst_buffer_unref (buf);
g_object_unref (download);
goto error;
}
if (!demux->input_caps || !gst_caps_is_equal (caps, demux->input_caps)) {
gst_caps_replace (&demux->input_caps, caps);
/* gst_pad_set_caps (demux->srcpad, demux->input_caps); */
GST_INFO_OBJECT (demux, "Input source caps: %" GST_PTR_FORMAT,
demux->input_caps);
demux->do_typefind = FALSE;
}
gst_caps_unref (caps);
} else {
gst_fragment_set_caps (download, demux->input_caps);
}
if (discont) {
GST_DEBUG_OBJECT (demux, "Marking fragment as discontinuous");
GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
} else {
GST_BUFFER_FLAG_UNSET (buf, GST_BUFFER_FLAG_DISCONT);
}
/* The buffer ref is still kept inside the fragment download */
gst_buffer_unref (buf);
return download;
error:
{
return NULL;
}
return TRUE;
}

View file

@ -53,7 +53,7 @@ typedef struct _GstHLSDemuxClass GstHLSDemuxClass;
*/
struct _GstHLSDemux
{
GstElement parent;
GstBin parent;
GstPad *sinkpad;
GstPad *srcpad;
@ -103,11 +103,18 @@ struct _GstHLSDemux
/* Current download rate (bps) */
gint current_download_rate;
/* fragment download tooling */
GstElement *src;
GMutex fragment_download_lock;
GCond fragment_download_cond;
GstClockTime current_timestamp;
gboolean starting_fragment;
};
struct _GstHLSDemuxClass
{
GstElementClass parent_class;
GstBinClass parent_class;
};
GType gst_hls_demux_get_type (void);