dashdemux: Properly stop download and stream tasks where appropriate.

This commit is contained in:
Andre Moreira Magalhaes (andrunko) 2013-02-04 22:58:32 -02:00 committed by Thiago Santos
parent 288903a203
commit e92531c8f6
3 changed files with 140 additions and 102 deletions

View file

@ -276,23 +276,15 @@ gst_dash_demux_dispose (GObject * obj)
{
GstDashDemux *demux = GST_DASH_DEMUX (obj);
gst_dash_demux_reset (demux, TRUE);
if (demux->stream_task) {
if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
GST_DEBUG_OBJECT (demux, "Leaving streaming task");
gst_task_stop (demux->stream_task);
gst_task_join (demux->stream_task);
}
gst_object_unref (demux->stream_task);
g_static_rec_mutex_free (&demux->stream_lock);
demux->stream_task = NULL;
}
if (demux->download_task) {
if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
GST_DEBUG_OBJECT (demux, "Leaving download task");
gst_task_stop (demux->download_task);
gst_task_join (demux->download_task);
}
gst_object_unref (demux->download_task);
g_static_rec_mutex_free (&demux->download_lock);
demux->download_task = NULL;
@ -303,8 +295,6 @@ gst_dash_demux_dispose (GObject * obj)
demux->downloader = NULL;
}
gst_dash_demux_reset (demux, TRUE);
G_OBJECT_CLASS (parent_class)->dispose (obj);
}
@ -438,9 +428,11 @@ gst_dash_demux_change_state (GstElement * element, GstStateChange transition)
GstDashDemux *demux = GST_DASH_DEMUX (element);
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_dash_demux_reset (demux, FALSE);
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
/* Start the streaming loop in paused only if we already received
the manifest. It might have been stopped if we were in PAUSED
@ -460,10 +452,6 @@ gst_dash_demux_change_state (GstElement * element, GstStateChange transition)
gst_dash_demux_pause_stream_task (demux);
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
demux->cancelled = TRUE;
gst_dash_demux_stop (demux);
gst_task_join (demux->stream_task);
gst_task_join (demux->download_task);
break;
default:
break;
@ -471,18 +459,6 @@ gst_dash_demux_change_state (GstElement * element, GstStateChange transition)
return ret;
}
static void
gst_dash_demux_clear_queues (GstDashDemux * demux)
{
GSList *iter;
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
gst_data_queue_flush (stream->queue);
}
}
static gboolean
_check_queue_full (GstDataQueue * q, guint visible, guint bytes, guint64 time,
GstDashDemuxStream * stream)
@ -531,7 +507,6 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
GstDashDemux *demux;
demux = GST_DASH_DEMUX (gst_pad_get_element_private (pad));
GST_WARNING_OBJECT (demux, "Received an event");
switch (event->type) {
case GST_EVENT_SEEK:
@ -584,15 +559,12 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
/* Stop the demux */
demux->cancelled = TRUE;
/* Stop the demux, also clears the buffering queue */
gst_dash_demux_stop (demux);
/* Wait for streaming to finish */
g_static_rec_mutex_lock (&demux->stream_lock);
/* Clear the buffering queue */
/* FIXME: allow seeking in the buffering queue */
gst_dash_demux_clear_queues (demux);
//GST_MPD_CLIENT_LOCK (demux->client);
/* select the requested Period in the Media Presentation */
@ -672,6 +644,7 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
stream->need_segment = TRUE;
gst_data_queue_set_flushing (stream->queue, FALSE);
}
gst_uri_downloader_reset (demux->downloader);
gst_dash_demux_resume_download_task (demux);
gst_dash_demux_resume_stream_task (demux);
g_static_rec_mutex_unlock (&demux->stream_lock);
@ -754,9 +727,12 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
static gboolean
gst_dash_demux_sink_event (GstPad * pad, GstEvent * event)
{
GstDashDemux *demux = GST_DASH_DEMUX (gst_pad_get_parent (pad));
GstDashDemux *demux = GST_DASH_DEMUX (GST_PAD_PARENT (pad));
switch (event->type) {
case GST_EVENT_FLUSH_STOP:
gst_dash_demux_reset (demux, FALSE);
break;
case GST_EVENT_EOS:{
gchar *manifest;
GstQuery *query;
@ -913,15 +889,13 @@ gst_dash_demux_src_query (GstPad * pad, GstQuery * query)
static GstFlowReturn
gst_dash_demux_pad (GstPad * pad, GstBuffer * buf)
{
GstDashDemux *demux = GST_DASH_DEMUX (gst_pad_get_parent (pad));
GstDashDemux *demux = GST_DASH_DEMUX (GST_PAD_PARENT (pad));
if (demux->manifest == NULL)
demux->manifest = buf;
else
demux->manifest = gst_buffer_join (demux->manifest, buf);
gst_object_unref (demux);
return GST_FLOW_OK;
}
@ -930,7 +904,9 @@ gst_dash_demux_stop (GstDashDemux * demux)
{
GSList *iter;
gst_uri_downloader_cancel (demux->downloader);
if (demux->downloader)
gst_uri_downloader_cancel (demux->downloader);
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
@ -940,10 +916,22 @@ gst_dash_demux_stop (GstDashDemux * demux)
if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
GST_TASK_SIGNAL (demux->download_task);
gst_task_stop (demux->download_task);
g_static_rec_mutex_lock (&demux->download_lock);
g_static_rec_mutex_unlock (&demux->download_lock);
gst_task_join (demux->download_task);
}
if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
GST_TASK_SIGNAL (demux->stream_task);
gst_task_stop (demux->stream_task);
g_static_rec_mutex_lock (&demux->stream_lock);
g_static_rec_mutex_unlock (&demux->stream_lock);
gst_task_join (demux->stream_task);
}
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
gst_data_queue_flush (stream->queue);
}
}
@ -1082,8 +1070,10 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
eos = FALSE;
eop = FALSE;
if (!gst_data_queue_peek (stream->queue, &item))
continue;
if (!gst_data_queue_peek (stream->queue, &item)) {
/* flushing */
goto flushing;
}
if (G_LIKELY (GST_IS_BUFFER (item->object))) {
pad_switch = FALSE;
@ -1170,8 +1160,16 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
}
end:
GST_INFO_OBJECT (demux, "Leaving streaming task");
return;
flushing:
{
GST_WARNING_OBJECT (demux, "Flushing, leaving streaming task");
gst_task_stop (demux->stream_task);
return;
}
end_of_manifest:
{
GST_INFO_OBJECT (demux, "Reached end of manifest, sending EOS");
@ -1190,7 +1188,7 @@ error_pushing:
GST_ERROR_OBJECT (demux,
"Error pushing buffer: %s... terminating the demux",
gst_flow_get_name (ret));
gst_dash_demux_stop (demux);
gst_task_stop (demux->stream_task);
return;
}
}
@ -1198,13 +1196,18 @@ error_pushing:
static void
gst_dash_demux_stream_free (GstDashDemuxStream * stream)
{
if (stream->input_caps)
if (stream->input_caps) {
gst_caps_unref (stream->input_caps);
if (stream->pad)
stream->input_caps = NULL;
}
if (stream->pad) {
gst_object_unref (stream->pad);
/* TODO flush the queue */
g_object_unref (stream->queue);
stream->pad = NULL;
}
if (stream->queue) {
g_object_unref (stream->queue);
stream->queue = NULL;
}
g_free (stream);
}
@ -1216,12 +1219,16 @@ gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose)
demux->end_of_period = FALSE;
demux->end_of_manifest = FALSE;
demux->cancelled = FALSE;
gst_dash_demux_clear_queues (demux);
demux->cancelled = TRUE;
gst_dash_demux_stop (demux);
if (demux->downloader)
gst_uri_downloader_reset (demux->downloader);
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
if (stream->pad)
gst_element_remove_pad (GST_ELEMENT (demux), stream->pad);
gst_dash_demux_stream_free (stream);
}
g_slist_free (demux->streams);
@ -1243,6 +1250,7 @@ gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose)
demux->last_manifest_update = GST_CLOCK_TIME_NONE;
demux->position = 0;
demux->position_shift = 0;
demux->cancelled = FALSE;
}
static GstClockTime
@ -1432,6 +1440,8 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
} else {
goto error_downloading;
}
} else if (demux->cancelled) {
goto cancelled;
} else {
goto quit;
}
@ -1441,13 +1451,18 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
demux->client->update_failed_count = 0;
quit:
return;
cancelled:
{
GST_WARNING_OBJECT (demux, "Cancelled, leaving download task");
gst_task_stop (demux->download_task);
return;
}
end_of_manifest:
{
GST_INFO_OBJECT (demux, "Stopped download task");
GST_INFO_OBJECT (demux, "End of manifest, leaving download task");
gst_task_stop (demux->download_task);
return;
}
@ -1455,8 +1470,8 @@ end_of_manifest:
error_downloading:
{
GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
("Could not fetch the next fragment"), (NULL));
gst_dash_demux_stop (demux);
("Could not fetch the next fragment, leaving download task"), (NULL));
gst_task_stop (demux->download_task);
return;
}
}
@ -1775,18 +1790,18 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux)
if (download == NULL)
return FALSE;
buffer = gst_fragment_get_buffer (download);
active_stream =
gst_mpdparser_get_active_stream_by_index (demux->client, stream_idx);
if (active_stream == NULL) /* TODO unref fragments */
return FALSE;
buffer = gst_fragment_get_buffer (download);
if (selected_stream->need_header) {
/* We need to fetch a new header */
if ((header =
gst_dash_demux_get_next_header (demux, stream_idx)) == NULL) {
GST_INFO_OBJECT (demux, "Unable to fetch header");
GST_WARNING_OBJECT (demux, "Unable to fetch header");
} else {
GstBuffer *header_buffer;
/* Replace fragment with a new one including the header */

View file

@ -19,8 +19,6 @@
* Boston, MA 02111-1307, USA.
*/
#define GLIB_DISABLE_DEPRECATION_WARNINGS
#include <glib.h>
#include "gstfragmented.h"
#include "gstfragment.h"
@ -43,6 +41,7 @@ struct _GstUriDownloaderPrivate
GstFragment *download;
GMutex *lock;
GCond *cond;
gboolean cancelled;
};
static void gst_uri_downloader_finalize (GObject * object);
@ -93,7 +92,7 @@ gst_uri_downloader_init (GstUriDownloader * downloader)
gst_pad_set_event_function (downloader->priv->pad,
GST_DEBUG_FUNCPTR (gst_uri_downloader_sink_event));
gst_pad_set_element_private (downloader->priv->pad, downloader);
gst_pad_activate_push (downloader->priv->pad, TRUE);
gst_pad_set_active (downloader->priv->pad, TRUE);
/* Create a bus to handle error and warning message from the source element */
downloader->priv->bus = gst_bus_new ();
@ -150,32 +149,33 @@ gst_uri_downloader_new (void)
static gboolean
gst_uri_downloader_sink_event (GstPad * pad, GstEvent * event)
{
GstUriDownloader *downloader =
(GstUriDownloader *) (gst_pad_get_element_private (pad));
gboolean ret = FALSE;
GstUriDownloader *downloader;
downloader = GST_URI_DOWNLOADER (gst_pad_get_element_private (pad));
switch (event->type) {
case GST_EVENT_EOS:{
GST_OBJECT_LOCK (downloader);
g_mutex_lock (downloader->priv->lock);
GST_DEBUG_OBJECT (downloader, "Got EOS on the fetcher pad");
if (downloader->priv->download != NULL) {
/* signal we have fetched the URI */
downloader->priv->download->completed = TRUE;
downloader->priv->download->download_stop_time = g_get_real_time ();
GST_OBJECT_UNLOCK (downloader);
downloader->priv->download->download_stop_time =
gst_util_get_timestamp ();
GST_DEBUG_OBJECT (downloader, "Signaling chain funtion");
g_cond_signal (downloader->priv->cond);
} else {
GST_OBJECT_UNLOCK (downloader);
}
g_mutex_unlock (downloader->priv->lock);
gst_event_unref (event);
break;
}
default:
ret = gst_pad_event_default (pad, event);
break;
}
gst_event_unref (event);
return FALSE;
return ret;
}
static GstBusSyncReply
@ -186,9 +186,17 @@ gst_uri_downloader_bus_handler (GstBus * bus,
if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR ||
GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) {
GError *err = NULL;
gchar *dbg_info = NULL;
gst_message_parse_error (message, &err, &dbg_info);
GST_WARNING_OBJECT (downloader,
"Received error in bus from the source element, "
"the download will be cancelled");
"Received error: %s from %s, the download will be cancelled",
GST_OBJECT_NAME (message->src), err->message);
GST_DEBUG ("Debugging info: %s\n", (dbg_info) ? dbg_info : "none");
g_error_free (err);
g_free (dbg_info);
/* remove the sync handler to avoid duplicated messages */
gst_bus_set_sync_handler (downloader->priv->bus, NULL, NULL);
gst_uri_downloader_cancel (downloader);
@ -201,29 +209,28 @@ gst_uri_downloader_bus_handler (GstBus * bus,
static GstFlowReturn
gst_uri_downloader_chain (GstPad * pad, GstBuffer * buf)
{
GstUriDownloader *downloader =
(GstUriDownloader *) gst_pad_get_element_private (pad);
GstUriDownloader *downloader;
downloader = GST_URI_DOWNLOADER (gst_pad_get_element_private (pad));
/* HTML errors (404, 500, etc...) are also pushed through this pad as
* response but the source element will also post a warning or error message
* in the bus, which is handled synchronously cancelling the download.
*/
GST_OBJECT_LOCK (downloader);
g_mutex_lock (downloader->priv->lock);
if (downloader->priv->download == NULL) {
/* Download cancelled, quit */
GST_OBJECT_UNLOCK (downloader);
goto done;
}
GST_LOG_OBJECT (downloader,
"The uri fetcher received a new buffer of size %u",
GST_BUFFER_SIZE (buf));
GST_LOG_OBJECT (downloader, "The uri fetcher received a new buffer "
"of size %u", GST_BUFFER_SIZE (buf));
if (!gst_fragment_add_buffer (downloader->priv->download, buf))
GST_WARNING_OBJECT (downloader, "Could not add buffer to fragment");
GST_OBJECT_UNLOCK (downloader);
done:
{
g_mutex_unlock (downloader->priv->lock);
return GST_FLOW_OK;
}
}
@ -249,21 +256,40 @@ gst_uri_downloader_stop (GstUriDownloader * downloader)
GST_CLOCK_TIME_NONE);
}
void
gst_uri_downloader_reset (GstUriDownloader * downloader)
{
g_return_if_fail (downloader != NULL);
g_mutex_lock (downloader->priv->lock);
downloader->priv->cancelled = FALSE;
g_mutex_unlock (downloader->priv->lock);
}
void
gst_uri_downloader_cancel (GstUriDownloader * downloader)
{
GST_OBJECT_LOCK (downloader);
g_return_if_fail (downloader != NULL);
g_mutex_lock (downloader->priv->lock);
if (downloader->priv->download != NULL) {
GST_DEBUG_OBJECT (downloader, "Cancelling download");
g_object_unref (downloader->priv->download);
downloader->priv->download = NULL;
GST_OBJECT_UNLOCK (downloader);
downloader->priv->cancelled = TRUE;
GST_DEBUG_OBJECT (downloader, "Signaling chain funtion");
g_cond_signal (downloader->priv->cond);
g_mutex_unlock (downloader->priv->lock);
} else {
GST_OBJECT_UNLOCK (downloader);
GST_DEBUG_OBJECT (downloader,
"Trying to cancell a download that was alredy cancelled");
gboolean cancelled;
cancelled = downloader->priv->cancelled;
downloader->priv->cancelled = TRUE;
g_mutex_unlock (downloader->priv->lock);
if (cancelled)
GST_DEBUG_OBJECT (downloader,
"Trying to cancel a download that was already cancelled");
}
}
@ -272,23 +298,15 @@ gst_uri_downloader_set_uri (GstUriDownloader * downloader, const gchar * uri)
{
GstPad *pad;
g_return_val_if_fail (downloader != NULL, FALSE);
if (!gst_uri_is_valid (uri))
return FALSE;
if (downloader->priv->urisrc == NULL) {
GST_DEBUG_OBJECT (downloader, "Creating source element for the URI:%s",
uri);
downloader->priv->urisrc =
gst_element_make_from_uri (GST_URI_SRC, uri, NULL);
if (!downloader->priv->urisrc)
return FALSE;
} else {
GST_DEBUG_OBJECT (downloader,
"Reusing existing source element for the URI:%s", uri);
if (!gst_uri_handler_set_uri (GST_URI_HANDLER (downloader->priv->urisrc),
uri))
return FALSE;
}
GST_DEBUG_OBJECT (downloader, "Creating source element for the URI:%s", uri);
downloader->priv->urisrc = gst_element_make_from_uri (GST_URI_SRC, uri, NULL);
if (!downloader->priv->urisrc)
return FALSE;
/* add a sync handler for the bus messages to detect errors in the download */
gst_element_set_bus (GST_ELEMENT (downloader->priv->urisrc),
@ -330,18 +348,21 @@ gst_uri_downloader_fetch_uri (GstUriDownloader * downloader, const gchar * uri)
* - the download failed (Error message on the fetcher bus)
* - the download was canceled
*/
GST_DEBUG_OBJECT (downloader, "Waiting to fetch the URI");
GST_DEBUG_OBJECT (downloader, "Waiting to fetch the URI %s", uri);
if (downloader->priv->cancelled) {
g_object_unref (downloader->priv->download);
downloader->priv->download = NULL;
goto quit;
}
g_cond_wait (downloader->priv->cond, downloader->priv->lock);
GST_OBJECT_LOCK (downloader);
download = downloader->priv->download;
downloader->priv->download = NULL;
GST_OBJECT_UNLOCK (downloader);
if (download != NULL)
GST_INFO_OBJECT (downloader, "URI fetched successfully");
GST_INFO_OBJECT (downloader, "URI %s fetched successfully", uri);
else
GST_INFO_OBJECT (downloader, "Error fetching URI");
GST_INFO_OBJECT (downloader, "Error fetching URI %s", uri);
quit:
{

View file

@ -57,8 +57,10 @@ GType gst_uri_downloader_get_type (void);
GstUriDownloader * gst_uri_downloader_new (void);
GstFragment * gst_uri_downloader_fetch_uri (GstUriDownloader * downloader, const gchar * uri);
void gst_uri_downloader_reset (GstUriDownloader *downloader);
void gst_uri_downloader_cancel (GstUriDownloader *downloader);
G_END_DECLS
#endif /* __GSTURIDOWNLOADER_H__ */