gstreamer/gst-libs/gst/uridownloader/gsturidownloader.c
Carlos Garcia Campos 429353337d uridownloader: Take the ownership of the src element
The URI downloader is creating the source element with
gst_element_factory_make() that returns a floating reference that nobody
is consuming. This is causing problems in WebKit, where the smart
pointers used to take references of the source elment get confused and
end up consuming the floating reference and then releasing the element,
which usually crashes because the URI downloader still tries to use its
src element. See https://bugs.webkit.org/show_bug.cgi?id=144040.
This commit adds two helper functions to ensure and destroy the source
element, to make the code simpler and less error prone. The ensure
method takes care of checking if we can reuse the existing one or we
need to create a new one, taking always its ownership. The destroy
method simply avoids duplicated code to set the source to NULL state and
then unref it.

https://bugzilla.gnome.org/show_bug.cgi?id=766053
2016-05-06 15:26:59 +02:00

669 lines
20 KiB
C

/* GStreamer
* Copyright (C) 2011 Andoni Morales Alastruey <ylatuya@gmail.com>
*
* gstfragment.c:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include <glib.h>
#include "gstfragment.h"
#include "gsturidownloader.h"
#include "gsturidownloader_debug.h"
#define GST_CAT_DEFAULT uridownloader_debug
GST_DEBUG_CATEGORY (uridownloader_debug);
#define GST_URI_DOWNLOADER_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), \
GST_TYPE_URI_DOWNLOADER, GstUriDownloaderPrivate))
struct _GstUriDownloaderPrivate
{
/* Fragments fetcher */
GstElement *urisrc;
GstBus *bus;
GstPad *pad;
GTimeVal *timeout;
GstFragment *download;
gboolean got_buffer;
GMutex download_lock; /* used to restrict to one download only */
GError *err;
GCond cond;
gboolean cancelled;
};
static void gst_uri_downloader_finalize (GObject * object);
static void gst_uri_downloader_dispose (GObject * object);
static GstFlowReturn gst_uri_downloader_chain (GstPad * pad, GstObject * parent,
GstBuffer * buf);
static gboolean gst_uri_downloader_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static GstBusSyncReply gst_uri_downloader_bus_handler (GstBus * bus,
GstMessage * message, gpointer data);
static gboolean gst_uri_downloader_ensure_src (GstUriDownloader * downloader,
const gchar * uri);
static void gst_uri_downloader_destroy_src (GstUriDownloader * downloader);
static GstStaticPadTemplate sinkpadtemplate = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
#define _do_init \
{ \
GST_DEBUG_CATEGORY_INIT (uridownloader_debug, "uridownloader", 0, "URI downloader"); \
}
G_DEFINE_TYPE_WITH_CODE (GstUriDownloader, gst_uri_downloader, GST_TYPE_OBJECT,
_do_init);
static void
gst_uri_downloader_class_init (GstUriDownloaderClass * klass)
{
GObjectClass *gobject_class;
gobject_class = (GObjectClass *) klass;
g_type_class_add_private (klass, sizeof (GstUriDownloaderPrivate));
gobject_class->dispose = gst_uri_downloader_dispose;
gobject_class->finalize = gst_uri_downloader_finalize;
}
static void
gst_uri_downloader_init (GstUriDownloader * downloader)
{
downloader->priv = GST_URI_DOWNLOADER_GET_PRIVATE (downloader);
/* Initialize the sink pad. This pad will be connected to the src pad of the
* element created with gst_element_make_from_uri and will handle the download */
downloader->priv->pad =
gst_pad_new_from_static_template (&sinkpadtemplate, "sink");
gst_pad_set_chain_function (downloader->priv->pad,
GST_DEBUG_FUNCPTR (gst_uri_downloader_chain));
gst_pad_set_event_function (downloader->priv->pad,
GST_DEBUG_FUNCPTR (gst_uri_downloader_sink_event));
gst_pad_set_element_private (downloader->priv->pad, downloader);
gst_pad_set_active (downloader->priv->pad, TRUE);
/* Create a bus to handle error and warning message from the source element */
downloader->priv->bus = gst_bus_new ();
g_mutex_init (&downloader->priv->download_lock);
g_cond_init (&downloader->priv->cond);
}
static void
gst_uri_downloader_dispose (GObject * object)
{
GstUriDownloader *downloader = GST_URI_DOWNLOADER (object);
gst_uri_downloader_destroy_src (downloader);
if (downloader->priv->bus != NULL) {
gst_object_unref (downloader->priv->bus);
downloader->priv->bus = NULL;
}
if (downloader->priv->pad) {
gst_object_unref (downloader->priv->pad);
downloader->priv->pad = NULL;
}
if (downloader->priv->download) {
g_object_unref (downloader->priv->download);
downloader->priv->download = NULL;
}
G_OBJECT_CLASS (gst_uri_downloader_parent_class)->dispose (object);
}
static void
gst_uri_downloader_finalize (GObject * object)
{
GstUriDownloader *downloader = GST_URI_DOWNLOADER (object);
g_mutex_clear (&downloader->priv->download_lock);
g_cond_clear (&downloader->priv->cond);
G_OBJECT_CLASS (gst_uri_downloader_parent_class)->finalize (object);
}
GstUriDownloader *
gst_uri_downloader_new (void)
{
return g_object_new (GST_TYPE_URI_DOWNLOADER, NULL);
}
static gboolean
gst_uri_downloader_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event)
{
gboolean ret = FALSE;
GstUriDownloader *downloader;
downloader = GST_URI_DOWNLOADER (gst_pad_get_element_private (pad));
switch (event->type) {
case GST_EVENT_EOS:{
GST_OBJECT_LOCK (downloader);
GST_DEBUG_OBJECT (downloader, "Got EOS on the fetcher pad");
if (downloader->priv->download != NULL) {
/* signal we have fetched the URI */
downloader->priv->download->completed = TRUE;
downloader->priv->download->download_stop_time =
gst_util_get_timestamp ();
GST_DEBUG_OBJECT (downloader, "Signaling chain funtion");
g_cond_signal (&downloader->priv->cond);
}
GST_OBJECT_UNLOCK (downloader);
gst_event_unref (event);
break;
}
case GST_EVENT_CUSTOM_DOWNSTREAM_STICKY:{
const GstStructure *str;
str = gst_event_get_structure (event);
if (gst_structure_has_name (str, "http-headers")) {
GST_OBJECT_LOCK (downloader);
if (downloader->priv->download != NULL) {
if (downloader->priv->download->headers)
gst_structure_free (downloader->priv->download->headers);
downloader->priv->download->headers = gst_structure_copy (str);
}
GST_OBJECT_UNLOCK (downloader);
}
}
/* falls through */
default:
ret = gst_pad_event_default (pad, parent, event);
break;
}
return ret;
}
static GstBusSyncReply
gst_uri_downloader_bus_handler (GstBus * bus,
GstMessage * message, gpointer data)
{
GstUriDownloader *downloader = (GstUriDownloader *) (data);
if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) {
GError *err = NULL;
gchar *dbg_info = NULL;
gchar *new_error = NULL;
gst_message_parse_error (message, &err, &dbg_info);
GST_WARNING_OBJECT (downloader,
"Received error: %s from %s, the download will be cancelled",
err->message, GST_OBJECT_NAME (message->src));
GST_DEBUG ("Debugging info: %s\n", (dbg_info) ? dbg_info : "none");
if (dbg_info)
new_error = g_strdup_printf ("%s: %s\n", err->message, dbg_info);
if (new_error) {
g_free (err->message);
err->message = new_error;
}
if (!downloader->priv->err)
downloader->priv->err = err;
else
g_error_free (err);
g_free (dbg_info);
/* remove the sync handler to avoid duplicated messages */
gst_bus_set_sync_handler (downloader->priv->bus, NULL, NULL, NULL);
/* stop the download */
GST_OBJECT_LOCK (downloader);
if (downloader->priv->download != NULL) {
GST_DEBUG_OBJECT (downloader, "Stopping download");
g_object_unref (downloader->priv->download);
downloader->priv->download = NULL;
downloader->priv->cancelled = TRUE;
g_cond_signal (&downloader->priv->cond);
}
GST_OBJECT_UNLOCK (downloader);
} else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) {
GError *err = NULL;
gchar *dbg_info = NULL;
gst_message_parse_warning (message, &err, &dbg_info);
GST_WARNING_OBJECT (downloader,
"Received warning: %s from %s",
GST_OBJECT_NAME (message->src), err->message);
GST_DEBUG ("Debugging info: %s\n", (dbg_info) ? dbg_info : "none");
g_error_free (err);
g_free (dbg_info);
}
gst_message_unref (message);
return GST_BUS_DROP;
}
static GstFlowReturn
gst_uri_downloader_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
{
GstUriDownloader *downloader;
downloader = GST_URI_DOWNLOADER (gst_pad_get_element_private (pad));
/* HTML errors (404, 500, etc...) are also pushed through this pad as
* response but the source element will also post a warning or error message
* in the bus, which is handled synchronously cancelling the download.
*/
GST_OBJECT_LOCK (downloader);
if (downloader->priv->download == NULL) {
/* Download cancelled, quit */
gst_buffer_unref (buf);
GST_OBJECT_UNLOCK (downloader);
goto done;
}
GST_LOG_OBJECT (downloader, "The uri fetcher received a new buffer "
"of size %" G_GSIZE_FORMAT, gst_buffer_get_size (buf));
downloader->priv->got_buffer = TRUE;
if (!gst_fragment_add_buffer (downloader->priv->download, buf)) {
GST_WARNING_OBJECT (downloader, "Could not add buffer to fragment");
gst_buffer_unref (buf);
}
GST_OBJECT_UNLOCK (downloader);
done:
{
return GST_FLOW_OK;
}
}
void
gst_uri_downloader_reset (GstUriDownloader * downloader)
{
g_return_if_fail (downloader != NULL);
GST_OBJECT_LOCK (downloader);
downloader->priv->cancelled = FALSE;
GST_OBJECT_UNLOCK (downloader);
}
void
gst_uri_downloader_cancel (GstUriDownloader * downloader)
{
GST_OBJECT_LOCK (downloader);
if (downloader->priv->download != NULL) {
GST_DEBUG_OBJECT (downloader, "Cancelling download");
g_object_unref (downloader->priv->download);
downloader->priv->download = NULL;
downloader->priv->cancelled = TRUE;
GST_DEBUG_OBJECT (downloader, "Signaling chain funtion");
g_cond_signal (&downloader->priv->cond);
} else {
gboolean cancelled;
cancelled = downloader->priv->cancelled;
downloader->priv->cancelled = TRUE;
if (cancelled)
GST_DEBUG_OBJECT (downloader,
"Trying to cancel a download that was alredy cancelled");
}
GST_OBJECT_UNLOCK (downloader);
}
static gboolean
gst_uri_downloader_set_range (GstUriDownloader * downloader,
gint64 range_start, gint64 range_end)
{
g_return_val_if_fail (range_start >= 0, FALSE);
g_return_val_if_fail (range_end >= -1, FALSE);
if (range_start || (range_end >= 0)) {
GstEvent *seek;
seek = gst_event_new_seek (1.0, GST_FORMAT_BYTES, GST_SEEK_FLAG_FLUSH,
GST_SEEK_TYPE_SET, range_start, GST_SEEK_TYPE_SET, range_end);
return gst_element_send_event (downloader->priv->urisrc, seek);
}
return TRUE;
}
static gboolean
gst_uri_downloader_ensure_src (GstUriDownloader * downloader, const gchar * uri)
{
if (downloader->priv->urisrc) {
gchar *old_protocol, *new_protocol;
gchar *old_uri;
old_uri =
gst_uri_handler_get_uri (GST_URI_HANDLER (downloader->priv->urisrc));
old_protocol = gst_uri_get_protocol (old_uri);
new_protocol = gst_uri_get_protocol (uri);
if (!g_str_equal (old_protocol, new_protocol)) {
gst_uri_downloader_destroy_src (downloader);
GST_DEBUG_OBJECT (downloader, "Can't re-use old source element");
} else {
GError *err = NULL;
GST_DEBUG_OBJECT (downloader, "Re-using old source element");
if (!gst_uri_handler_set_uri
(GST_URI_HANDLER (downloader->priv->urisrc), uri, &err)) {
GST_DEBUG_OBJECT (downloader,
"Failed to re-use old source element: %s", err->message);
g_clear_error (&err);
gst_uri_downloader_destroy_src (downloader);
}
}
g_free (old_uri);
g_free (old_protocol);
g_free (new_protocol);
}
if (!downloader->priv->urisrc) {
GST_DEBUG_OBJECT (downloader, "Creating source element for the URI:%s",
uri);
downloader->priv->urisrc =
gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
if (downloader->priv->urisrc) {
/* gst_element_make_from_uri returns a floating reference
* and we are not going to transfer the ownership, so we
* should take it.
*/
gst_object_ref_sink (downloader->priv->urisrc);
}
}
return downloader->priv->urisrc != NULL;
}
static void
gst_uri_downloader_destroy_src (GstUriDownloader * downloader)
{
if (!downloader->priv->urisrc)
return;
gst_element_set_state (downloader->priv->urisrc, GST_STATE_NULL);
gst_object_unref (downloader->priv->urisrc);
downloader->priv->urisrc = NULL;
}
static gboolean
gst_uri_downloader_set_uri (GstUriDownloader * downloader, const gchar * uri,
const gchar * referer, gboolean compress,
gboolean refresh, gboolean allow_cache)
{
GstPad *pad;
GObjectClass *gobject_class;
if (!gst_uri_is_valid (uri))
return FALSE;
if (!gst_uri_downloader_ensure_src (downloader, uri))
return FALSE;
gobject_class = G_OBJECT_GET_CLASS (downloader->priv->urisrc);
if (g_object_class_find_property (gobject_class, "compress"))
g_object_set (downloader->priv->urisrc, "compress", compress, NULL);
if (g_object_class_find_property (gobject_class, "keep-alive"))
g_object_set (downloader->priv->urisrc, "keep-alive", TRUE, NULL);
if (g_object_class_find_property (gobject_class, "extra-headers")) {
if (referer || refresh || !allow_cache) {
GstStructure *extra_headers = gst_structure_new_empty ("headers");
if (referer)
gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
NULL);
if (!allow_cache)
gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
"no-cache", NULL);
else if (refresh)
gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
"max-age=0", NULL);
g_object_set (downloader->priv->urisrc, "extra-headers", extra_headers,
NULL);
gst_structure_free (extra_headers);
} else {
g_object_set (downloader->priv->urisrc, "extra-headers", NULL, NULL);
}
}
/* add a sync handler for the bus messages to detect errors in the download */
gst_element_set_bus (GST_ELEMENT (downloader->priv->urisrc),
downloader->priv->bus);
gst_bus_set_sync_handler (downloader->priv->bus,
gst_uri_downloader_bus_handler, downloader, NULL);
pad = gst_element_get_static_pad (downloader->priv->urisrc, "src");
if (!pad)
return FALSE;
gst_pad_link (pad, downloader->priv->pad);
gst_object_unref (pad);
return TRUE;
}
static gboolean
gst_uri_downloader_set_method (GstUriDownloader * downloader,
const gchar * method)
{
GObjectClass *gobject_class;
if (!downloader->priv->urisrc)
return FALSE;
gobject_class = G_OBJECT_GET_CLASS (downloader->priv->urisrc);
if (g_object_class_find_property (gobject_class, "method")) {
g_object_set (downloader->priv->urisrc, "method", method, NULL);
return TRUE;
}
return FALSE;
}
GstFragment *
gst_uri_downloader_fetch_uri (GstUriDownloader * downloader,
const gchar * uri, const gchar * referer, gboolean compress,
gboolean refresh, gboolean allow_cache, GError ** err)
{
return gst_uri_downloader_fetch_uri_with_range (downloader, uri,
referer, compress, refresh, allow_cache, 0, -1, err);
}
/**
* gst_uri_downloader_fetch_uri_with_range:
* @downloader: the #GstUriDownloader
* @uri: the uri
* @range_start: the starting byte index
* @range_end: the final byte index, use -1 for unspecified
*
* Returns the downloaded #GstFragment
*/
GstFragment *
gst_uri_downloader_fetch_uri_with_range (GstUriDownloader *
downloader, const gchar * uri, const gchar * referer, gboolean compress,
gboolean refresh, gboolean allow_cache,
gint64 range_start, gint64 range_end, GError ** err)
{
GstStateChangeReturn ret;
GstFragment *download = NULL;
GST_DEBUG_OBJECT (downloader, "Fetching URI %s", uri);
g_mutex_lock (&downloader->priv->download_lock);
downloader->priv->err = NULL;
downloader->priv->got_buffer = FALSE;
GST_OBJECT_LOCK (downloader);
if (downloader->priv->cancelled) {
GST_DEBUG_OBJECT (downloader, "Cancelled, aborting fetch");
goto quit;
}
if (!gst_uri_downloader_set_uri (downloader, uri, referer, compress, refresh,
allow_cache)) {
GST_WARNING_OBJECT (downloader, "Failed to set URI");
goto quit;
}
gst_bus_set_flushing (downloader->priv->bus, FALSE);
if (downloader->priv->download)
g_object_unref (downloader->priv->download);
downloader->priv->download = gst_fragment_new ();
downloader->priv->download->range_start = range_start;
downloader->priv->download->range_end = range_end;
GST_OBJECT_UNLOCK (downloader);
ret = gst_element_set_state (downloader->priv->urisrc, GST_STATE_READY);
GST_OBJECT_LOCK (downloader);
if (ret == GST_STATE_CHANGE_FAILURE || downloader->priv->download == NULL) {
GST_WARNING_OBJECT (downloader, "Failed to set src to READY");
goto quit;
}
/* might have been cancelled because of failures in state change */
if (downloader->priv->cancelled) {
goto quit;
}
if (range_start < 0 && range_end < 0) {
if (!gst_uri_downloader_set_method (downloader, "HEAD")) {
GST_WARNING_OBJECT (downloader, "Failed to set HTTP method");
goto quit;
}
} else {
if (!gst_uri_downloader_set_range (downloader, range_start, range_end)) {
GST_WARNING_OBJECT (downloader, "Failed to set range");
goto quit;
}
}
GST_OBJECT_UNLOCK (downloader);
ret = gst_element_set_state (downloader->priv->urisrc, GST_STATE_PLAYING);
GST_OBJECT_LOCK (downloader);
if (ret == GST_STATE_CHANGE_FAILURE) {
if (downloader->priv->download) {
g_object_unref (downloader->priv->download);
downloader->priv->download = NULL;
}
goto quit;
}
/* might have been cancelled because of failures in state change */
if (downloader->priv->cancelled) {
goto quit;
}
/* wait until:
* - the download succeed (EOS in the src pad)
* - the download failed (Error message on the fetcher bus)
* - the download was canceled
*/
GST_DEBUG_OBJECT (downloader, "Waiting to fetch the URI %s", uri);
while (!downloader->priv->cancelled && !downloader->priv->download->completed)
g_cond_wait (&downloader->priv->cond, GST_OBJECT_GET_LOCK (downloader));
if (downloader->priv->cancelled) {
if (downloader->priv->download) {
g_object_unref (downloader->priv->download);
downloader->priv->download = NULL;
}
goto quit;
}
download = downloader->priv->download;
downloader->priv->download = NULL;
if (!downloader->priv->got_buffer) {
if (download->range_start < 0 && download->range_end < 0) {
/* HEAD request, so we don't expect a response */
} else {
g_object_unref (download);
download = NULL;
GST_ERROR_OBJECT (downloader, "Didn't retrieve a buffer before EOS");
}
}
if (download != NULL)
GST_INFO_OBJECT (downloader, "URI fetched successfully");
else
GST_INFO_OBJECT (downloader, "Error fetching URI");
quit:
{
if (downloader->priv->urisrc) {
GstPad *pad;
GstElement *urisrc;
urisrc = downloader->priv->urisrc;
GST_DEBUG_OBJECT (downloader, "Stopping source element %s",
GST_ELEMENT_NAME (urisrc));
/* remove the bus' sync handler */
gst_bus_set_sync_handler (downloader->priv->bus, NULL, NULL, NULL);
gst_bus_set_flushing (downloader->priv->bus, TRUE);
/* set the element state to NULL */
GST_OBJECT_UNLOCK (downloader);
if (download == NULL) {
gst_element_set_state (urisrc, GST_STATE_NULL);
} else {
GstQuery *query;
/* Download successfull, let's query the URI */
query = gst_query_new_uri ();
if (gst_element_query (urisrc, query)) {
gst_query_parse_uri (query, &download->uri);
gst_query_parse_uri_redirection (query, &download->redirect_uri);
gst_query_parse_uri_redirection_permanent (query,
&download->redirect_permanent);
}
gst_query_unref (query);
gst_element_set_state (urisrc, GST_STATE_READY);
}
GST_OBJECT_LOCK (downloader);
gst_element_set_bus (urisrc, NULL);
/* unlink the source element from the internal pad */
pad = gst_pad_get_peer (downloader->priv->pad);
if (pad) {
gst_pad_unlink (pad, downloader->priv->pad);
gst_object_unref (pad);
}
}
GST_OBJECT_UNLOCK (downloader);
if (download == NULL) {
if (!downloader->priv->err) {
g_set_error (err, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
"Failed to download '%s'", uri);
} else {
g_propagate_error (err, downloader->priv->err);
downloader->priv->err = NULL;
}
}
downloader->priv->cancelled = FALSE;
g_mutex_unlock (&downloader->priv->download_lock);
return download;
}
}