diff --git a/gst/hls/Makefile.am b/gst/hls/Makefile.am index fec8eeb278..05f0ac89f1 100644 --- a/gst/hls/Makefile.am +++ b/gst/hls/Makefile.am @@ -4,6 +4,8 @@ plugin_LTLIBRARIES = libgstfragmented.la libgstfragmented_la_SOURCES = \ m3u8.c \ gsthlsdemux.c \ + gstfragment.c \ + gsturidownloader.c \ gstfragmentedplugin.c libgstfragmented_la_CFLAGS = $(GST_PLUGINS_BAD_CFLAGS) $(GST_BASE_CFLAGS) $(GST_CFLAGS) $(SOUP_CFLAGS) @@ -14,7 +16,9 @@ libgstfragmented_la_LIBTOOLFLAGS = --tag=disable-static # headers we need but don't want installed noinst_HEADERS = \ gstfragmented.h \ + gstfragment.h \ gsthlsdemux.h \ + gsturidownloader.h \ m3u8.h Android.mk: Makefile.am $(BUILT_SOURCES) diff --git a/gst/hls/gstfragment.c b/gst/hls/gstfragment.c new file mode 100644 index 0000000000..00a3b4ce34 --- /dev/null +++ b/gst/hls/gstfragment.c @@ -0,0 +1,222 @@ +/* GStreamer + * Copyright (C) 2011 Andoni Morales Alastruey + * + * gstfragment.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include +#include "gstfragmented.h" +#include "gstfragment.h" + +#define GST_CAT_DEFAULT fragmented_debug + +#define GST_FRAGMENT_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_FRAGMENT, GstFragmentPrivate)) + +enum +{ + PROP_0, + PROP_INDEX, + PROP_NAME, + PROP_DURATION, + PROP_DISCONTINOUS, + PROP_BUFFER_LIST, + PROP_LAST +}; + +struct _GstFragmentPrivate +{ + GstBufferList *buffer_list; + GstBufferListIterator *buffer_iterator; + gboolean headers_set; +}; + +G_DEFINE_TYPE (GstFragment, gst_fragment, G_TYPE_OBJECT); + +static void gst_fragment_dispose (GObject * object); +static void gst_fragment_finalize (GObject * object); + +static void +gst_fragment_get_property (GObject * object, + guint property_id, GValue * value, GParamSpec * pspec) +{ + GstFragment *fragment = GST_FRAGMENT (object); + + switch (property_id) { + case PROP_INDEX: + g_value_set_uint (value, fragment->index); + break; + + case PROP_NAME: + g_value_set_string (value, fragment->name); + break; + + case PROP_DURATION: + g_value_set_uint64 (value, fragment->stop_time - fragment->start_time); + break; + + case PROP_DISCONTINOUS: + g_value_set_boolean (value, fragment->discontinuous); + break; + + case PROP_BUFFER_LIST: + g_value_set_object (value, gst_fragment_get_buffer_list (fragment)); + break; + + default: + /* We don't have any other property... */ + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +gst_fragment_class_init (GstFragmentClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + + g_type_class_add_private (klass, sizeof (GstFragmentPrivate)); + + gobject_class->get_property = gst_fragment_get_property; + gobject_class->dispose = gst_fragment_dispose; + gobject_class->finalize = gst_fragment_finalize; + + g_object_class_install_property (gobject_class, PROP_INDEX, + g_param_spec_uint ("index", "Index", "Index of the fragment", 0, + G_MAXUINT, 0, G_PARAM_READABLE)); + + g_object_class_install_property (gobject_class, PROP_NAME, + g_param_spec_string ("name", "Name", + "Name of the fragment (eg:fragment-12.ts)", NULL, G_PARAM_READABLE)); + + g_object_class_install_property (gobject_class, PROP_DISCONTINOUS, + g_param_spec_boolean ("discontinuous", "Discontinous", + "Whether this fragment has a discontinuity or not", + FALSE, G_PARAM_READABLE)); + + g_object_class_install_property (gobject_class, PROP_DURATION, + g_param_spec_uint64 ("duration", "Fragment duration", + "Duration of the fragment", 0, G_MAXUINT64, 0, G_PARAM_READABLE)); + + g_object_class_install_property (gobject_class, PROP_BUFFER_LIST, + g_param_spec_object ("buffer-list", "Buffer List", + "A list with the fragment's buffers", GST_TYPE_FRAGMENT, + G_PARAM_READABLE)); +} + +static void +gst_fragment_init (GstFragment * fragment) +{ + GstFragmentPrivate *priv; + + fragment->priv = priv = GST_FRAGMENT_GET_PRIVATE (fragment); + + priv->buffer_list = gst_buffer_list_new (); + priv->buffer_iterator = gst_buffer_list_iterate (priv->buffer_list); + gst_buffer_list_iterator_add_group (priv->buffer_iterator); + priv->headers_set = FALSE; + fragment->download_start_time = g_get_real_time (); + fragment->start_time = 0; + fragment->stop_time = 0; + fragment->index = 0; + fragment->name = g_strdup (""); + fragment->completed = FALSE; + fragment->discontinuous = FALSE; +} + +GstFragment * +gst_fragment_new (void) +{ + return GST_FRAGMENT (g_object_new (GST_TYPE_FRAGMENT, NULL)); +} + +static void +gst_fragment_finalize (GObject * gobject) +{ + GstFragment *fragment = GST_FRAGMENT (gobject); + + g_free (fragment->name); + + G_OBJECT_CLASS (gst_fragment_parent_class)->finalize (gobject); +} + +void +gst_fragment_dispose (GObject * object) +{ + GstFragmentPrivate *priv = GST_FRAGMENT (object)->priv; + + if (priv->buffer_list != NULL) { + gst_buffer_list_iterator_free (priv->buffer_iterator); + gst_buffer_list_unref (priv->buffer_list); + priv->buffer_list = NULL; + } + + G_OBJECT_CLASS (gst_fragment_parent_class)->dispose (object); +} + +GstBufferList * +gst_fragment_get_buffer_list (GstFragment * fragment) +{ + g_return_val_if_fail (fragment != NULL, NULL); + + if (!fragment->completed) + return NULL; + + gst_buffer_list_ref (fragment->priv->buffer_list); + return fragment->priv->buffer_list; +} + +gboolean +gst_fragment_set_headers (GstFragment * fragment, GstBuffer ** buffer, + guint count) +{ + guint i; + + g_return_val_if_fail (fragment != NULL, FALSE); + g_return_val_if_fail (buffer != NULL, FALSE); + + if (fragment->priv->headers_set) + return FALSE; + + for (i = 0; i < count; i++) { + /* We steal the buffers you pass in */ + gst_buffer_list_iterator_add (fragment->priv->buffer_iterator, buffer[i]); + gst_buffer_list_iterator_add_group (fragment->priv->buffer_iterator); + } + return TRUE; +} + +gboolean +gst_fragment_add_buffer (GstFragment * fragment, GstBuffer * buffer) +{ + g_return_val_if_fail (fragment != NULL, FALSE); + g_return_val_if_fail (buffer != NULL, FALSE); + + if (fragment->completed) { + GST_WARNING ("Fragment is completed, could not add more buffers"); + return FALSE; + } + + /* if this is the first buffer forbid setting the headers anymore */ + if (G_UNLIKELY (fragment->priv->headers_set == FALSE)) + fragment->priv->headers_set = TRUE; + + GST_DEBUG ("Adding new buffer to the fragment"); + /* We steal the buffers you pass in */ + gst_buffer_list_iterator_add (fragment->priv->buffer_iterator, buffer); + return TRUE; +} diff --git a/gst/hls/gstfragment.h b/gst/hls/gstfragment.h new file mode 100644 index 0000000000..9ea018606e --- /dev/null +++ b/gst/hls/gstfragment.h @@ -0,0 +1,69 @@ +/* GStreamer + * Copyright (C) 2011 Andoni Morales Alastruey + * + * gstfragment.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GSTFRAGMENT_H__ +#define __GSTFRAGMENT_H__ + +#include +#include + +G_BEGIN_DECLS + +#define GST_TYPE_FRAGMENT (gst_fragment_get_type()) +#define GST_FRAGMENT(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_FRAGMENT,GstFragment)) +#define GST_FRAGMENT_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_FRAGMENT,GstFragmentClass)) +#define GST_IS_FRAGMENT(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_FRAGMENT)) +#define GST_IS_FRAGMENT_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_FRAGMENT)) + +typedef struct _GstFragment GstFragment; +typedef struct _GstFragmentPrivate GstFragmentPrivate; +typedef struct _GstFragmentClass GstFragmentClass; + +struct _GstFragment +{ + GObject parent; + + gchar * name; /* Name of the fragment */ + gboolean completed; /* Whether the fragment is complete or not */ + guint64 download_start_time; /* Epoch time when the download started */ + guint64 download_stop_time; /* Epoch time when the download finished */ + guint64 start_time; /* Start time of the fragment */ + guint64 stop_time; /* Stop time of the fragment */ + gboolean index; /* Index of the fragment */ + gboolean discontinuous; /* Whether this fragment is discontinuous or not */ + + GstFragmentPrivate *priv; +}; + +struct _GstFragmentClass +{ + GObjectClass parent_class; +}; + +GType gst_fragment_get_type (void); + +GstBufferList * gst_fragment_get_buffer_list (GstFragment *fragment); +gboolean gst_fragment_set_headers (GstFragment *fragment, GstBuffer **buffer, guint count); +gboolean gst_fragment_add_buffer (GstFragment *fragment, GstBuffer *buffer); +GstFragment * gst_fragment_new (void); + +G_END_DECLS +#endif /* __GSTFRAGMENT_H__ */ diff --git a/gst/hls/gsthlsdemux.c b/gst/hls/gsthlsdemux.c index 5e9ebb595e..ca3d64e05d 100644 --- a/gst/hls/gsthlsdemux.c +++ b/gst/hls/gsthlsdemux.c @@ -60,11 +60,6 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-hls")); -static GstStaticPadTemplate fetchertemplate = GST_STATIC_PAD_TEMPLATE ("sink", - GST_PAD_SINK, - GST_PAD_ALWAYS, - GST_STATIC_CAPS_ANY); - GST_DEBUG_CATEGORY_STATIC (gst_hls_demux_debug); #define GST_CAT_DEFAULT gst_hls_demux_debug @@ -95,31 +90,23 @@ static GstStateChangeReturn gst_hls_demux_change_state (GstElement * element, GstStateChange transition); /* GstHLSDemux */ -static GstBusSyncReply gst_hls_demux_fetcher_bus_handler (GstBus * bus, - GstMessage * message, gpointer data); static GstFlowReturn gst_hls_demux_chain (GstPad * pad, GstBuffer * buf); static gboolean gst_hls_demux_sink_event (GstPad * pad, GstEvent * event); static gboolean gst_hls_demux_src_event (GstPad * pad, GstEvent * event); static gboolean gst_hls_demux_src_query (GstPad * pad, GstQuery * query); -static GstFlowReturn gst_hls_demux_fetcher_chain (GstPad * pad, - GstBuffer * buf); -static gboolean gst_hls_demux_fetcher_sink_event (GstPad * pad, - GstEvent * event); -static void gst_hls_demux_loop (GstHLSDemux * demux); +static void gst_hls_demux_stream_loop (GstHLSDemux * demux); +static void gst_hls_demux_updates_loop (GstHLSDemux * demux); static void gst_hls_demux_stop (GstHLSDemux * demux); -static void gst_hls_demux_stop_fetcher_locked (GstHLSDemux * demux, - gboolean cancelled); -static void gst_hls_demux_stop_update (GstHLSDemux * demux); -static gboolean gst_hls_demux_start_update (GstHLSDemux * demux); static gboolean gst_hls_demux_cache_fragments (GstHLSDemux * demux); static gboolean gst_hls_demux_schedule (GstHLSDemux * demux); static gboolean gst_hls_demux_switch_playlist (GstHLSDemux * demux); -static gboolean gst_hls_demux_get_next_fragment (GstHLSDemux * demux); +static gboolean gst_hls_demux_get_next_fragment (GstHLSDemux * demux, + gboolean caching); static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux); static void gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose); static gboolean gst_hls_demux_set_location (GstHLSDemux * demux, const gchar * uri); -static gchar *gst_hls_src_buf_to_utf8_playlist (gchar * string, guint size); +static gchar *gst_hls_src_buf_to_utf8_playlist (GstBuffer * buf); static void _do_init (GType type) @@ -155,23 +142,37 @@ gst_hls_demux_dispose (GObject * obj) { GstHLSDemux *demux = GST_HLS_DEMUX (obj); - g_cond_free (demux->fetcher_cond); - g_mutex_free (demux->fetcher_lock); + if (demux->stream_task) { + if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) { + GST_DEBUG_OBJECT (demux, "Leaving streaming task"); + gst_task_stop (demux->stream_task); + gst_task_join (demux->stream_task); + } + gst_object_unref (demux->stream_task); + g_static_rec_mutex_free (&demux->stream_lock); + demux->stream_task = NULL; + } - g_cond_free (demux->thread_cond); - g_mutex_free (demux->thread_lock); + if (demux->updates_task) { + if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) { + GST_DEBUG_OBJECT (demux, "Leaving updates task"); + gst_task_stop (demux->updates_task); + gst_task_join (demux->updates_task); + } + gst_object_unref (demux->updates_task); + g_mutex_free (demux->updates_timed_lock); + g_static_rec_mutex_free (&demux->updates_lock); + demux->updates_task = NULL; + } - gst_task_join (demux->task); - gst_object_unref (demux->task); - g_static_rec_mutex_free (&demux->task_lock); - - gst_object_unref (demux->fetcher_bus); - gst_object_unref (demux->fetcherpad); + if (demux->downloader != NULL) { + g_object_unref (demux->downloader); + demux->downloader = NULL; + } gst_hls_demux_reset (demux, TRUE); g_queue_free (demux->queue); - gst_object_unref (demux->download); G_OBJECT_CLASS (parent_class)->dispose (obj); } @@ -218,15 +219,8 @@ gst_hls_demux_init (GstHLSDemux * demux, GstHLSDemuxClass * klass) GST_DEBUG_FUNCPTR (gst_hls_demux_sink_event)); gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad); - /* fetcher pad */ - demux->fetcherpad = - gst_pad_new_from_static_template (&fetchertemplate, "sink"); - gst_pad_set_chain_function (demux->fetcherpad, - GST_DEBUG_FUNCPTR (gst_hls_demux_fetcher_chain)); - gst_pad_set_event_function (demux->fetcherpad, - GST_DEBUG_FUNCPTR (gst_hls_demux_fetcher_sink_event)); - gst_pad_set_element_private (demux->fetcherpad, demux); - gst_pad_activate_push (demux->fetcherpad, TRUE); + /* Downloader */ + demux->downloader = gst_uri_downloader_new (); demux->do_typefind = TRUE; @@ -234,19 +228,20 @@ gst_hls_demux_init (GstHLSDemux * demux, GstHLSDemuxClass * klass) demux->fragments_cache = DEFAULT_FRAGMENTS_CACHE; demux->bitrate_switch_tol = DEFAULT_BITRATE_SWITCH_TOLERANCE; - demux->download = gst_adapter_new (); - demux->fetcher_bus = gst_bus_new (); - gst_bus_set_sync_handler (demux->fetcher_bus, - gst_hls_demux_fetcher_bus_handler, demux); - demux->thread_cond = g_cond_new (); - demux->thread_lock = g_mutex_new (); - demux->fetcher_cond = g_cond_new (); - demux->fetcher_lock = g_mutex_new (); demux->queue = g_queue_new (); - g_static_rec_mutex_init (&demux->task_lock); - /* FIXME: This really should be a pad task instead */ - demux->task = gst_task_create ((GstTaskFunction) gst_hls_demux_loop, demux); - gst_task_set_lock (demux->task, &demux->task_lock); + + /* Updates task */ + g_static_rec_mutex_init (&demux->updates_lock); + demux->updates_task = + gst_task_create ((GstTaskFunction) gst_hls_demux_updates_loop, demux); + gst_task_set_lock (demux->updates_task, &demux->updates_lock); + demux->updates_timed_lock = g_mutex_new (); + + /* Streaming task */ + g_static_rec_mutex_init (&demux->stream_lock); + demux->stream_task = + gst_task_create ((GstTaskFunction) gst_hls_demux_stream_loop, demux); + gst_task_set_lock (demux->stream_task, &demux->stream_lock); } static void @@ -303,7 +298,7 @@ gst_hls_demux_change_state (GstElement * element, GstStateChange transition) state and we filled our queue with enough cached fragments */ if (gst_m3u8_client_get_uri (demux->client)[0] != '\0') - gst_hls_demux_start_update (demux); + gst_task_start (demux->updates_task); break; default: break; @@ -313,12 +308,12 @@ gst_hls_demux_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - gst_hls_demux_stop_update (demux); + gst_task_stop (demux->updates_task); break; case GST_STATE_CHANGE_PAUSED_TO_READY: demux->cancelled = TRUE; gst_hls_demux_stop (demux); - gst_task_join (demux->task); + gst_task_join (demux->stream_task); gst_hls_demux_reset (demux, FALSE); break; default: @@ -392,23 +387,20 @@ gst_hls_demux_src_event (GstPad * pad, GstEvent * event) } demux->cancelled = TRUE; - gst_task_pause (demux->task); - g_mutex_lock (demux->fetcher_lock); - gst_hls_demux_stop_fetcher_locked (demux, TRUE); - g_mutex_unlock (demux->fetcher_lock); - gst_hls_demux_stop_update (demux); - gst_task_pause (demux->task); + gst_task_pause (demux->stream_task); + gst_uri_downloader_cancel (demux->downloader); + gst_task_stop (demux->updates_task); + gst_task_pause (demux->stream_task); /* wait for streaming to finish */ - g_static_rec_mutex_lock (&demux->task_lock); + g_static_rec_mutex_lock (&demux->stream_lock); demux->need_cache = TRUE; while (!g_queue_is_empty (demux->queue)) { - GstBuffer *buf = g_queue_pop_head (demux->queue); - gst_buffer_unref (buf); + GstBufferList *buf_list = g_queue_pop_head (demux->queue); + gst_buffer_list_unref (buf_list); } g_queue_clear (demux->queue); - gst_adapter_clear (demux->download); GST_M3U8_CLIENT_LOCK (demux->client); GST_DEBUG_OBJECT (demux, "seeking to sequence %d", current_sequence); @@ -425,8 +417,8 @@ gst_hls_demux_src_event (GstPad * pad, GstEvent * event) } demux->cancelled = FALSE; - gst_task_start (demux->task); - g_static_rec_mutex_unlock (&demux->task_lock); + gst_task_start (demux->stream_task); + g_static_rec_mutex_unlock (&demux->stream_lock); return TRUE; } @@ -467,9 +459,7 @@ gst_hls_demux_sink_event (GstPad * pad, GstEvent * event) } gst_query_unref (query); - playlist = gst_hls_src_buf_to_utf8_playlist ((gchar *) - GST_BUFFER_DATA (demux->playlist), GST_BUFFER_SIZE (demux->playlist)); - gst_buffer_unref (demux->playlist); + playlist = gst_hls_src_buf_to_utf8_playlist (demux->playlist); demux->playlist = NULL; if (playlist == NULL) { GST_WARNING_OBJECT (demux, "Error validating first playlist."); @@ -489,7 +479,7 @@ gst_hls_demux_sink_event (GstPad * pad, GstEvent * event) return FALSE; } - gst_task_start (demux->task); + gst_task_start (demux->stream_task); gst_event_unref (event); return TRUE; } @@ -572,27 +562,6 @@ gst_hls_demux_src_query (GstPad * pad, GstQuery * query) return ret; } -static gboolean -gst_hls_demux_fetcher_sink_event (GstPad * pad, GstEvent * event) -{ - GstHLSDemux *demux = GST_HLS_DEMUX (gst_pad_get_element_private (pad)); - - switch (event->type) { - case GST_EVENT_EOS:{ - GST_DEBUG_OBJECT (demux, "Got EOS on the fetcher pad"); - /* signal we have fetched the URI */ - if (!demux->cancelled) { - g_cond_broadcast (demux->fetcher_cond); - } - } - default: - break; - } - - gst_event_unref (event); - return FALSE; -} - static GstFlowReturn gst_hls_demux_chain (GstPad * pad, GstBuffer * buf) { @@ -608,72 +577,19 @@ gst_hls_demux_chain (GstPad * pad, GstBuffer * buf) return GST_FLOW_OK; } -static GstFlowReturn -gst_hls_demux_fetcher_chain (GstPad * pad, GstBuffer * buf) -{ - GstHLSDemux *demux = GST_HLS_DEMUX (gst_pad_get_element_private (pad)); - - /* The source element can be an http source element. In case we get a 404, - * the html response will be sent downstream and the adapter - * will not be null, which might make us think that the request proceed - * successfully. But it will also post an error message in the bus that - * is handled synchronously and that will set demux->fetcher_error to TRUE, - * which is used to discard this buffer with the html response. */ - if (demux->fetcher_error) { - goto done; - } - - gst_adapter_push (demux->download, buf); - -done: - { - return GST_FLOW_OK; - } -} - -static void -gst_hls_demux_stop_fetcher_locked (GstHLSDemux * demux, gboolean cancelled) -{ - GstPad *pad; - - /* When the fetcher is stopped while it's downloading, we will get an EOS that - * unblocks the fetcher thread and tries to stop it again from that thread. - * Here we check if the fetcher as already been stopped before continuing */ - if (demux->fetcher == NULL || demux->stopping_fetcher) - return; - - GST_DEBUG_OBJECT (demux, "Stopping fetcher."); - demux->stopping_fetcher = TRUE; - /* set the element state to NULL */ - gst_element_set_state (demux->fetcher, GST_STATE_NULL); - gst_element_get_state (demux->fetcher, NULL, NULL, GST_CLOCK_TIME_NONE); - /* unlink it from the internal pad */ - pad = gst_pad_get_peer (demux->fetcherpad); - if (pad) { - gst_pad_unlink (pad, demux->fetcherpad); - gst_object_unref (pad); - } - /* and finally unref it */ - gst_object_unref (demux->fetcher); - demux->fetcher = NULL; - - /* if we stopped it to cancell a download, free the cached buffer */ - if (cancelled && gst_adapter_available (demux->download)) { - gst_adapter_clear (demux->download); - } - /* signal the fetcher thread that the download has finished/cancelled */ - if (cancelled) - g_cond_broadcast (demux->fetcher_cond); -} - static void gst_hls_demux_stop (GstHLSDemux * demux) { - g_mutex_lock (demux->fetcher_lock); - gst_hls_demux_stop_fetcher_locked (demux, TRUE); - g_mutex_unlock (demux->fetcher_lock); - gst_task_stop (demux->task); - gst_hls_demux_stop_update (demux); + gst_uri_downloader_cancel (demux->downloader); + + if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) { + demux->stop_stream_task = TRUE; + gst_task_stop (demux->updates_task); + GST_TASK_SIGNAL (demux->updates_task); + } + + if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) + gst_task_stop (demux->stream_task); } static void @@ -681,7 +597,8 @@ switch_pads (GstHLSDemux * demux, GstCaps * newcaps) { GstPad *oldpad = demux->srcpad; - GST_DEBUG ("Switching pads (oldpad:%p)", oldpad); + GST_DEBUG ("Switching pads (oldpad:%p) with caps: %" GST_PTR_FORMAT, oldpad, + newcaps); /* FIXME: This is a workaround for a bug in playsink. * If we're switching from an audio-only or video-only fragment @@ -717,8 +634,9 @@ switch_pads (GstHLSDemux * demux, GstCaps * newcaps) } static void -gst_hls_demux_loop (GstHLSDemux * demux) +gst_hls_demux_stream_loop (GstHLSDemux * demux) { + GstBufferList *buffer_list; GstBuffer *buf; GstFlowReturn ret; @@ -734,7 +652,7 @@ gst_hls_demux_loop (GstHLSDemux * demux) /* we can start now the updates thread (only if on playing) */ if (GST_STATE (demux) == GST_STATE_PLAYING) - gst_hls_demux_start_update (demux); + gst_task_start (demux->updates_task); GST_INFO_OBJECT (demux, "First fragments cached successfully"); } @@ -745,8 +663,9 @@ gst_hls_demux_loop (GstHLSDemux * demux) goto pause_task; } - buf = g_queue_pop_head (demux->queue); - + buffer_list = g_queue_pop_head (demux->queue); + /* Work with the first buffer of the list */ + buf = gst_buffer_list_get (buffer_list, 0, 0); /* Figure out if we need to create/switch pads */ if (G_UNLIKELY (!demux->srcpad || GST_BUFFER_CAPS (buf) != GST_PAD_CAPS (demux->srcpad) @@ -769,9 +688,9 @@ gst_hls_demux_loop (GstHLSDemux * demux) if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DURATION (buf))) demux->position += GST_BUFFER_DURATION (buf); - ret = gst_pad_push (demux->srcpad, buf); + ret = gst_pad_push_list (demux->srcpad, buffer_list); if (ret != GST_FLOW_OK) - goto error; + goto error_pushing; return; @@ -785,7 +704,7 @@ end_of_playlist: cache_error: { - gst_task_pause (demux->task); + gst_task_pause (demux->stream_task); if (!demux->cancelled) { GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND, ("Could not cache the first fragments"), (NULL)); @@ -794,70 +713,26 @@ cache_error: return; } -error: +error_pushing: { /* FIXME: handle error */ - GST_DEBUG_OBJECT (demux, "error, stopping task"); + GST_DEBUG_OBJECT (demux, "Error pushing buffer: %s... stopping task", + gst_flow_get_name (ret)); gst_hls_demux_stop (demux); return; } pause_task: { - gst_task_pause (demux->task); + gst_task_pause (demux->stream_task); return; } } -static GstBusSyncReply -gst_hls_demux_fetcher_bus_handler (GstBus * bus, - GstMessage * message, gpointer data) -{ - GstHLSDemux *demux = GST_HLS_DEMUX (data); - - if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) { - demux->fetcher_error = TRUE; - if (!demux->cancelled) { - g_mutex_lock (demux->fetcher_lock); - g_cond_broadcast (demux->fetcher_cond); - g_mutex_unlock (demux->fetcher_lock); - } - } - - gst_message_unref (message); - return GST_BUS_DROP; -} - -static gboolean -gst_hls_demux_make_fetcher_locked (GstHLSDemux * demux, const gchar * uri) -{ - GstPad *pad; - - if (!gst_uri_is_valid (uri)) - return FALSE; - - GST_DEBUG_OBJECT (demux, "Creating fetcher for the URI:%s", uri); - demux->fetcher = gst_element_make_from_uri (GST_URI_SRC, uri, NULL); - if (!demux->fetcher) - return FALSE; - - demux->fetcher_error = FALSE; - demux->stopping_fetcher = FALSE; - gst_element_set_bus (GST_ELEMENT (demux->fetcher), demux->fetcher_bus); - - pad = gst_element_get_static_pad (demux->fetcher, "src"); - if (pad) { - gst_pad_link (pad, demux->fetcherpad); - gst_object_unref (pad); - } - return TRUE; -} - static void gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose) { demux->need_cache = TRUE; - demux->thread_return = FALSE; demux->accumulated_delay = 0; demux->end_of_playlist = FALSE; demux->cancelled = FALSE; @@ -873,8 +748,6 @@ gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose) demux->playlist = NULL; } - gst_adapter_clear (demux->download); - if (demux->client) { gst_m3u8_client_free (demux->client); demux->client = NULL; @@ -885,8 +758,8 @@ gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose) } while (!g_queue_is_empty (demux->queue)) { - GstBuffer *buf = g_queue_pop_head (demux->queue); - gst_buffer_unref (buf); + GstBufferList *buffer_list = g_queue_pop_head (demux->queue); + gst_buffer_list_unref (buffer_list); } g_queue_clear (demux->queue); @@ -905,8 +778,8 @@ gst_hls_demux_set_location (GstHLSDemux * demux, const gchar * uri) return TRUE; } -static gboolean -gst_hls_demux_update_thread (GstHLSDemux * demux) +void +gst_hls_demux_updates_loop (GstHLSDemux * demux) { /* Loop for the updates. It's started when the first fragments are cached and * schedules the next update of the playlist (for lives sources) and the next @@ -914,15 +787,14 @@ gst_hls_demux_update_thread (GstHLSDemux * demux) * download time with the next scheduled update to check if we can or should * switch to a different bitrate */ - g_mutex_lock (demux->thread_lock); - GST_DEBUG_OBJECT (demux, "Started updates thread"); + /* block until the next scheduled update or the signal to quit this thread */ + g_mutex_lock (demux->updates_timed_lock); + GST_DEBUG_OBJECT (demux, "Started updates task"); while (TRUE) { - /* block until the next scheduled update or the signal to quit this thread */ - if (g_cond_timed_wait (demux->thread_cond, demux->thread_lock, - &demux->next_update)) { + if (g_cond_timed_wait (GST_TASK_GET_COND (demux->updates_task), + demux->updates_timed_lock, &demux->next_update)) { goto quit; } - /* update the playlist for live sources */ if (gst_m3u8_client_is_live (demux->client)) { if (!gst_hls_demux_update_playlist (demux)) { @@ -955,7 +827,7 @@ gst_hls_demux_update_thread (GstHLSDemux * demux) /* fetch the next fragment */ if (g_queue_is_empty (demux->queue)) { - if (!gst_hls_demux_get_next_fragment (demux)) { + if (!gst_hls_demux_get_next_fragment (demux, FALSE)) { if (!demux->end_of_playlist && !demux->cancelled) { demux->client->update_failed_count++; if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) { @@ -978,41 +850,12 @@ gst_hls_demux_update_thread (GstHLSDemux * demux) quit: { - GST_DEBUG_OBJECT (demux, "Stopped updates thread"); - demux->updates_thread = NULL; - g_mutex_unlock (demux->thread_lock); - return TRUE; + GST_DEBUG_OBJECT (demux, "Stopped updates task"); + gst_hls_demux_stop (demux); + g_mutex_unlock (demux->updates_timed_lock); } } - -static void -gst_hls_demux_stop_update (GstHLSDemux * demux) -{ - GST_DEBUG_OBJECT (demux, "Stopping updates thread"); - while (demux->updates_thread) { - g_mutex_lock (demux->thread_lock); - g_cond_signal (demux->thread_cond); - g_mutex_unlock (demux->thread_lock); - } -} - -static gboolean -gst_hls_demux_start_update (GstHLSDemux * demux) -{ - GError *error; - - /* creates a new thread for the updates */ - g_mutex_lock (demux->thread_lock); - if (demux->updates_thread == NULL) { - GST_DEBUG_OBJECT (demux, "Starting updates thread"); - demux->updates_thread = g_thread_create ( - (GThreadFunc) gst_hls_demux_update_thread, demux, FALSE, &error); - } - g_mutex_unlock (demux->thread_lock); - return (error != NULL); -} - static gboolean gst_hls_demux_cache_fragments (GstHLSDemux * demux) { @@ -1065,7 +908,7 @@ gst_hls_demux_cache_fragments (GstHLSDemux * demux) g_time_val_add (&demux->next_update, gst_m3u8_client_get_target_duration (demux->client) / GST_SECOND * G_USEC_PER_SEC); - if (!gst_hls_demux_get_next_fragment (demux)) { + if (!gst_hls_demux_get_next_fragment (demux, TRUE)) { if (demux->end_of_playlist) break; if (!demux->cancelled) @@ -1084,98 +927,58 @@ gst_hls_demux_cache_fragments (GstHLSDemux * demux) demux->need_cache = FALSE; return TRUE; -} -static gboolean -gst_hls_demux_fetch_location (GstHLSDemux * demux, const gchar * uri) -{ - GstStateChangeReturn ret; - gboolean bret = FALSE; - - g_mutex_lock (demux->fetcher_lock); - - while (demux->fetcher) - g_cond_wait (demux->fetcher_cond, demux->fetcher_lock); - - if (demux->cancelled) - goto quit; - - if (!gst_hls_demux_make_fetcher_locked (demux, uri)) { - goto uri_error; - } - - ret = gst_element_set_state (demux->fetcher, GST_STATE_PLAYING); - if (ret == GST_STATE_CHANGE_FAILURE) - goto state_change_error; - - /* wait until we have fetched the uri */ - GST_DEBUG_OBJECT (demux, "Waiting to fetch the URI"); - g_cond_wait (demux->fetcher_cond, demux->fetcher_lock); - - gst_hls_demux_stop_fetcher_locked (demux, FALSE); - - if (!demux->fetcher_error && gst_adapter_available (demux->download)) { - GST_INFO_OBJECT (demux, "URI fetched successfully"); - bret = TRUE; - } - goto quit; - -uri_error: - { - GST_ELEMENT_ERROR (demux, RESOURCE, OPEN_READ, - ("Could not create an element to fetch the given URI."), ("URI: \"%s\"", - uri)); - bret = FALSE; - goto quit; - } - -state_change_error: - { - GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE, - ("Error changing state of the fetcher element."), (NULL)); - bret = FALSE; - goto quit; - } - -quit: - { - /* Unlock any other fetcher that might be waiting */ - g_cond_broadcast (demux->fetcher_cond); - g_mutex_unlock (demux->fetcher_lock); - return bret; - } } static gchar * -gst_hls_src_buf_to_utf8_playlist (gchar * data, guint size) +gst_hls_src_buf_to_utf8_playlist (GstBuffer * buf) { + gint size; + gchar *data; gchar *playlist; + data = (gchar *) GST_BUFFER_DATA (buf); + size = GST_BUFFER_SIZE (buf); + if (!g_utf8_validate (data, size, NULL)) return NULL; /* alloc size + 1 to end with a null character */ playlist = g_malloc0 (size + 1); memcpy (playlist, data, size + 1); + + gst_buffer_unref (buf); return playlist; } static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux) { - const guint8 *data; + GstFragment *download; + GstBufferListIterator *it; + GstBuffer *buf; gchar *playlist; - guint avail; + const gchar *uri = gst_m3u8_client_get_current_uri (demux->client); - GST_INFO_OBJECT (demux, "Updating the playlist %s", uri); - if (!gst_hls_demux_fetch_location (demux, uri)) + download = gst_uri_downloader_fetch_uri (demux->downloader, uri); + + if (download == NULL) return FALSE; - avail = gst_adapter_available (demux->download); - data = gst_adapter_peek (demux->download, avail); - playlist = gst_hls_src_buf_to_utf8_playlist ((gchar *) data, avail); - gst_adapter_clear (demux->download); + /* Merge all the buffers in the list to build a unique buffer with the + * playlist */ + it = gst_buffer_list_iterate (gst_fragment_get_buffer_list (download)); + + /* skip the first group, which contains the headers, which are not set in the + * demuxer*/ + gst_buffer_list_iterator_next_group (it); + buf = gst_buffer_list_iterator_merge_group (it); + + playlist = gst_hls_src_buf_to_utf8_playlist (buf); + gst_buffer_list_iterator_free (it); + g_object_unref (download); + if (playlist == NULL) { GST_WARNING_OBJECT (demux, "Couldn't not validate playlist encoding"); return FALSE; @@ -1303,37 +1106,34 @@ gst_hls_demux_switch_playlist (GstHLSDemux * demux) } static gboolean -gst_hls_demux_get_next_fragment (GstHLSDemux * demux) +gst_hls_demux_get_next_fragment (GstHLSDemux * demux, gboolean caching) { - GstBuffer *buf; - guint avail; + GstFragment *download; const gchar *next_fragment_uri; GstClockTime duration; GstClockTime timestamp; + GstBufferList *buffer_list; + GstBuffer *buf; gboolean discont; if (!gst_m3u8_client_get_next_fragment (demux->client, &discont, &next_fragment_uri, &duration, ×tamp)) { GST_INFO_OBJECT (demux, "This playlist doesn't contain more fragments"); demux->end_of_playlist = TRUE; - gst_task_start (demux->task); + gst_task_start (demux->stream_task); return FALSE; } GST_INFO_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri); - if (!gst_hls_demux_fetch_location (demux, next_fragment_uri)) { - /* FIXME: The gst_m3u8_get_next_fragment increments the sequence number - but another thread might call get_next_fragment and this decrement - will not redownload the failed fragment, but might duplicate the - download of a succeeded fragment - */ - g_atomic_int_add (&demux->client->sequence, -1); - return FALSE; - } + download = gst_uri_downloader_fetch_uri (demux->downloader, + next_fragment_uri); - avail = gst_adapter_available (demux->download); - buf = gst_adapter_take_buffer (demux->download, avail); + if (download == NULL) + goto error; + + buffer_list = gst_fragment_get_buffer_list (download); + buf = gst_buffer_list_get (buffer_list, 0, 0); GST_BUFFER_DURATION (buf) = duration; GST_BUFFER_TIMESTAMP (buf) = timestamp; @@ -1357,8 +1157,17 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux) GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT); } - g_queue_push_tail (demux->queue, buf); - gst_task_start (demux->task); - gst_adapter_clear (demux->download); + g_queue_push_tail (demux->queue, buffer_list); + g_object_unref (download); + if (!caching) { + GST_TASK_SIGNAL (demux->updates_task); + } + return TRUE; + +error: + { + gst_hls_demux_stop (demux); + return FALSE; + } } diff --git a/gst/hls/gsthlsdemux.h b/gst/hls/gsthlsdemux.h index a09a88b769..16c86775ea 100644 --- a/gst/hls/gsthlsdemux.h +++ b/gst/hls/gsthlsdemux.h @@ -27,6 +27,8 @@ #include #include #include "m3u8.h" +#include "gstfragmented.h" +#include "gsturidownloader.h" G_BEGIN_DECLS #define GST_TYPE_HLS_DEMUX \ @@ -51,12 +53,12 @@ struct _GstHLSDemux { GstElement parent; - GstTask *task; - GStaticRecMutex task_lock; GstPad *srcpad; GstPad *sinkpad; + GstBuffer *playlist; GstCaps *input_caps; + GstUriDownloader *downloader; GstM3U8Client *client; /* M3U8 client */ GQueue *queue; /* Queue storing the fetched fragments */ gboolean need_cache; /* Wheter we need to cache some fragments before starting to push data */ @@ -67,25 +69,18 @@ struct _GstHLSDemux guint fragments_cache; /* number of fragments needed to be cached to start playing */ gfloat bitrate_switch_tol; /* tolerance with respect to the fragment duration to switch the bitarate*/ - /* Updates thread */ - GThread *updates_thread; /* Thread handling the playlist and fragments updates */ - GMutex *thread_lock; /* Thread lock */ - GCond *thread_cond; /* Signals the thread to quit */ - gboolean thread_return; /* Instructs the thread to return after the thread_quit condition is meet */ + /* Streaming task */ + GstTask *stream_task; + GStaticRecMutex stream_lock; + gboolean stop_stream_task; + + /* Updates task */ + GstTask *updates_task; + GStaticRecMutex updates_lock; + GMutex *updates_timed_lock; GTimeVal next_update; /* Time of the next update */ gint64 accumulated_delay; /* Delay accumulated fetching fragments, used to decide a playlist switch */ - - /* Fragments fetcher */ - GstElement *fetcher; - GstBus *fetcher_bus; - GstPad *fetcherpad; - GMutex *fetcher_lock; - GCond *fetcher_cond; - GTimeVal *timeout; - gboolean fetcher_error; - gboolean stopping_fetcher; gboolean cancelled; - GstAdapter *download; /* Position in the stream */ GstClockTime position; diff --git a/gst/hls/gsturidownloader.c b/gst/hls/gsturidownloader.c new file mode 100644 index 0000000000..e208efe155 --- /dev/null +++ b/gst/hls/gsturidownloader.c @@ -0,0 +1,347 @@ +/* GStreamer + * Copyright (C) 2011 Andoni Morales Alastruey + * + * gstfragment.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include +#include "gstfragmented.h" +#include "gstfragment.h" +#include "gsturidownloader.h" + +GST_DEBUG_CATEGORY_STATIC (uridownloader_debug); +#define GST_CAT_DEFAULT (uridownloader_debug) + +#define GST_URI_DOWNLOADER_GET_PRIVATE(obj) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((obj), \ + GST_TYPE_URI_DOWNLOADER, GstUriDownloaderPrivate)) + +struct _GstUriDownloaderPrivate +{ + /* Fragments fetcher */ + GstElement *urisrc; + GstBus *bus; + GstPad *pad; + GTimeVal *timeout; + GstFragment *download; + GMutex *lock; + GCond *cond; +}; + +static void gst_uri_downloader_finalize (GObject * object); +static void gst_uri_downloader_dispose (GObject * object); + +static GstFlowReturn gst_uri_downloader_chain (GstPad * pad, GstBuffer * buf); +static gboolean gst_uri_downloader_sink_event (GstPad * pad, GstEvent * event); +static GstBusSyncReply gst_uri_downloader_bus_handler (GstBus * bus, + GstMessage * message, gpointer data); + +static GstStaticPadTemplate sinkpadtemplate = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +#define _do_init \ +{ \ + GST_DEBUG_CATEGORY_INIT (uridownloader_debug, "uridownloader", 0, "URI downloader"); \ +} + +G_DEFINE_TYPE_WITH_CODE (GstUriDownloader, gst_uri_downloader, GST_TYPE_OBJECT, + _do_init); + +static void +gst_uri_downloader_class_init (GstUriDownloaderClass * klass) +{ + GObjectClass *gobject_class; + + gobject_class = (GObjectClass *) klass; + + g_type_class_add_private (klass, sizeof (GstUriDownloaderPrivate)); + + gobject_class->dispose = gst_uri_downloader_dispose; + gobject_class->finalize = gst_uri_downloader_finalize; +} + +static void +gst_uri_downloader_init (GstUriDownloader * downloader) +{ + downloader->priv = GST_URI_DOWNLOADER_GET_PRIVATE (downloader); + + /* Initialize the sink pad. This pad will be connected to the src pad of the + * element created with gst_element_make_from_uri and will handle the download */ + downloader->priv->pad = + gst_pad_new_from_static_template (&sinkpadtemplate, "sink"); + gst_pad_set_chain_function (downloader->priv->pad, + GST_DEBUG_FUNCPTR (gst_uri_downloader_chain)); + gst_pad_set_event_function (downloader->priv->pad, + GST_DEBUG_FUNCPTR (gst_uri_downloader_sink_event)); + gst_pad_set_element_private (downloader->priv->pad, downloader); + gst_pad_activate_push (downloader->priv->pad, TRUE); + + /* Create a bus to handle error and warning message from the source element */ + downloader->priv->bus = gst_bus_new (); + + downloader->priv->lock = g_mutex_new (); + downloader->priv->cond = g_cond_new (); +} + +static void +gst_uri_downloader_dispose (GObject * object) +{ + GstUriDownloader *downloader = GST_URI_DOWNLOADER (object); + + if (downloader->priv->urisrc != NULL) { + gst_object_unref (downloader->priv->urisrc); + downloader->priv->urisrc = NULL; + } + + if (downloader->priv->bus != NULL) { + gst_object_unref (downloader->priv->bus); + downloader->priv->bus = NULL; + } + + if (downloader->priv->pad) { + gst_object_unref (downloader->priv->pad); + downloader->priv->pad = NULL; + } + + if (downloader->priv->download) { + g_object_unref (downloader->priv->download); + downloader->priv->download = NULL; + } + + G_OBJECT_CLASS (gst_uri_downloader_parent_class)->dispose (object); +} + +static void +gst_uri_downloader_finalize (GObject * object) +{ + GstUriDownloader *downloader = GST_URI_DOWNLOADER (object); + + g_mutex_free (downloader->priv->lock); + g_cond_free (downloader->priv->cond); + + G_OBJECT_CLASS (gst_uri_downloader_parent_class)->finalize (object); +} + +GstUriDownloader * +gst_uri_downloader_new (void) +{ + return g_object_new (GST_TYPE_URI_DOWNLOADER, NULL); +} + +static gboolean +gst_uri_downloader_sink_event (GstPad * pad, GstEvent * event) +{ + GstUriDownloader *downloader = + (GstUriDownloader *) (gst_pad_get_element_private (pad)); + + switch (event->type) { + case GST_EVENT_EOS:{ + GST_OBJECT_LOCK (downloader); + GST_DEBUG_OBJECT (downloader, "Got EOS on the fetcher pad"); + if (downloader->priv->download != NULL) { + /* signal we have fetched the URI */ + downloader->priv->download->completed = TRUE; + downloader->priv->download->download_stop_time = g_get_real_time (); + GST_OBJECT_UNLOCK (downloader); + GST_DEBUG_OBJECT (downloader, "Signaling chain funtion"); + g_cond_signal (downloader->priv->cond); + + } else { + GST_OBJECT_UNLOCK (downloader); + } + break; + } + default: + break; + } + + gst_event_unref (event); + return FALSE; +} + +static GstBusSyncReply +gst_uri_downloader_bus_handler (GstBus * bus, + GstMessage * message, gpointer data) +{ + GstUriDownloader *downloader = (GstUriDownloader *) (data); + + if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR || + GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) { + GError *err = NULL; + gchar *dbg_info = NULL; + + gst_message_parse_error (message, &err, &dbg_info); + GST_WARNING_OBJECT (downloader, + "Received error: %s from %s, the download will be cancelled", + GST_OBJECT_NAME (message->src), err->message); + GST_DEBUG ("Debugging info: %s\n", (dbg_info) ? dbg_info : "none"); + g_error_free (err); + g_free (dbg_info); + + /* remove the sync handler to avoid duplicated messages */ + gst_bus_set_sync_handler (downloader->priv->bus, NULL, NULL); + gst_uri_downloader_cancel (downloader); + } + + gst_message_unref (message); + return GST_BUS_DROP; +} + +static GstFlowReturn +gst_uri_downloader_chain (GstPad * pad, GstBuffer * buf) +{ + GstUriDownloader *downloader = + (GstUriDownloader *) gst_pad_get_element_private (pad); + + /* HTML errors (404, 500, etc...) are also pushed through this pad as + * response but the source element will also post a warning or error message + * in the bus, which is handled synchronously cancelling the download. + */ + GST_OBJECT_LOCK (downloader); + if (downloader->priv->download == NULL) { + /* Download cancelled, quit */ + GST_OBJECT_UNLOCK (downloader); + goto done; + } + + GST_LOG_OBJECT (downloader, "The uri fetcher received a new buffer " + "of size %u", GST_BUFFER_SIZE (buf)); + if (!gst_fragment_add_buffer (downloader->priv->download, buf)) + GST_WARNING_OBJECT (downloader, "Could not add buffer to fragment"); + GST_OBJECT_UNLOCK (downloader); + +done: + { + return GST_FLOW_OK; + } +} + +static void +gst_uri_downloader_stop (GstUriDownloader * downloader) +{ + GstPad *pad; + + GST_DEBUG_OBJECT (downloader, "Stopping source element"); + + /* remove the bus' sync handler */ + gst_bus_set_sync_handler (downloader->priv->bus, NULL, NULL); + /* unlink the source element from the internal pad */ + pad = gst_pad_get_peer (downloader->priv->pad); + if (pad) { + gst_pad_unlink (pad, downloader->priv->pad); + gst_object_unref (pad); + } + /* set the element state to NULL */ + gst_element_set_state (downloader->priv->urisrc, GST_STATE_NULL); + gst_element_get_state (downloader->priv->urisrc, NULL, NULL, + GST_CLOCK_TIME_NONE); +} + +void +gst_uri_downloader_cancel (GstUriDownloader * downloader) +{ + GST_OBJECT_LOCK (downloader); + if (downloader->priv->download != NULL) { + GST_DEBUG_OBJECT (downloader, "Cancelling download"); + g_object_unref (downloader->priv->download); + downloader->priv->download = NULL; + GST_OBJECT_UNLOCK (downloader); + GST_DEBUG_OBJECT (downloader, "Signaling chain funtion"); + g_cond_signal (downloader->priv->cond); + } else { + GST_OBJECT_UNLOCK (downloader); + GST_DEBUG_OBJECT (downloader, + "Trying to cancell a download that was alredy cancelled"); + } +} + +static gboolean +gst_uri_downloader_set_uri (GstUriDownloader * downloader, const gchar * uri) +{ + GstPad *pad; + + if (!gst_uri_is_valid (uri)) + return FALSE; + + GST_DEBUG_OBJECT (downloader, "Creating source element for the URI:%s", uri); + downloader->priv->urisrc = gst_element_make_from_uri (GST_URI_SRC, uri, NULL); + if (!downloader->priv->urisrc) + return FALSE; + + /* add a sync handler for the bus messages to detect errors in the download */ + gst_element_set_bus (GST_ELEMENT (downloader->priv->urisrc), + downloader->priv->bus); + gst_bus_set_sync_handler (downloader->priv->bus, + gst_uri_downloader_bus_handler, downloader); + + pad = gst_element_get_static_pad (downloader->priv->urisrc, "src"); + if (!pad) + return FALSE; + gst_pad_link (pad, downloader->priv->pad); + gst_object_unref (pad); + return TRUE; +} + +GstFragment * +gst_uri_downloader_fetch_uri (GstUriDownloader * downloader, const gchar * uri) +{ + GstStateChangeReturn ret; + GstFragment *download = NULL; + + g_mutex_lock (downloader->priv->lock); + + if (!gst_uri_downloader_set_uri (downloader, uri)) { + goto quit; + } + + downloader->priv->download = gst_fragment_new (); + + ret = gst_element_set_state (downloader->priv->urisrc, GST_STATE_PLAYING); + if (ret == GST_STATE_CHANGE_FAILURE) { + g_object_unref (downloader->priv->download); + downloader->priv->download = NULL; + goto quit; + } + + /* wait until: + * - the download succeed (EOS in the src pad) + * - the download failed (Error message on the fetcher bus) + * - the download was canceled + */ + GST_DEBUG_OBJECT (downloader, "Waiting to fetch the URI"); + g_cond_wait (downloader->priv->cond, downloader->priv->lock); + + GST_OBJECT_LOCK (downloader); + download = downloader->priv->download; + downloader->priv->download = NULL; + GST_OBJECT_UNLOCK (downloader); + + if (download != NULL) + GST_INFO_OBJECT (downloader, "URI fetched successfully"); + else + GST_INFO_OBJECT (downloader, "Error fetching URI"); + +quit: + { + gst_uri_downloader_stop (downloader); + g_mutex_unlock (downloader->priv->lock); + return download; + } +} diff --git a/gst/hls/gsturidownloader.h b/gst/hls/gsturidownloader.h new file mode 100644 index 0000000000..bfb5157507 --- /dev/null +++ b/gst/hls/gsturidownloader.h @@ -0,0 +1,64 @@ +/* GStreamer + * Copyright (C) 2011 Andoni Morales Alastruey + * + * gsturidownloader.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * Youshould have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GSTURI_DOWNLOADER_H__ +#define __GSTURI_DOWNLOADER_H__ + +#include +#include +#include "gstfragment.h" + +G_BEGIN_DECLS + +#define GST_TYPE_URI_DOWNLOADER (gst_uri_downloader_get_type()) +#define GST_URI_DOWNLOADER(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_URI_DOWNLOADER,GstUriDownloader)) +#define GST_URI_DOWNLOADER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_URI_DOWNLOADER,GstUriDownloaderClass)) +#define GST_IS_URI_DOWNLOADER(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_URI_DOWNLOADER)) +#define GST_IS_URI_DOWNLOADER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_URI_DOWNLOADER)) + +typedef struct _GstUriDownloader GstUriDownloader; +typedef struct _GstUriDownloaderPrivate GstUriDownloaderPrivate; +typedef struct _GstUriDownloaderClass GstUriDownloaderClass; + +struct _GstUriDownloader +{ + GstObject parent; + + GstUriDownloaderPrivate *priv; +}; + +struct _GstUriDownloaderClass +{ + GstObjectClass parent_class; + + /*< private >*/ + gpointer _gst_reserved[GST_PADDING]; +}; + +GType gst_uri_downloader_get_type (void); + +GstUriDownloader * gst_uri_downloader_new (void); +GstFragment * gst_uri_downloader_fetch_uri (GstUriDownloader * downloader, const gchar * uri); +void gst_uri_downloader_cancel (GstUriDownloader *downloader); +void gst_uri_downloader_free (GstUriDownloader *downloader); + +G_END_DECLS +#endif /* __GSTURIDOWNLOADER_H__ */