mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-27 12:11:13 +00:00
5ae3c9318d
libsoup 3.0.x dispatches using a single source attached when the session is created, so we need to create the session with the same context that our download thread is later using. 2.74 or 3.1 will dispatch a response using the context which sent the request. However, for any context other than the one that created the session, this will also create and destroy sources, so there's still some slight performance benefit. Fixes: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/1384 Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2913>
1010 lines
30 KiB
C
1010 lines
30 KiB
C
/* GStreamer
|
|
* Copyright (C) 2021-2022 Jan Schmidt <jan@centricular.com>
|
|
*
|
|
* downloadhelper.c:
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Library General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Library General Public License for more details.
|
|
*
|
|
* Youshould have received a copy of the GNU Library General Public
|
|
* License along with this library; if not, write to the
|
|
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
* Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
#include "downloadhelper.h"
|
|
#include "../soup/gstsouploader.h"
|
|
|
|
GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
|
|
#define GST_CAT_DEFAULT adaptivedemux2_debug
|
|
|
|
#define CHUNK_BUFFER_SIZE 32768
|
|
|
|
typedef struct DownloadHelperTransfer DownloadHelperTransfer;
|
|
|
|
struct DownloadHelper
|
|
{
|
|
GThread *transfer_thread;
|
|
|
|
gboolean running;
|
|
|
|
GstAdaptiveDemuxClock *clock;
|
|
|
|
GMainContext *transfer_context;
|
|
GMainLoop *loop;
|
|
SoupSession *session;
|
|
|
|
GMutex transfer_lock;
|
|
GArray *active_transfers;
|
|
|
|
GAsyncQueue *transfer_requests;
|
|
GSource *transfer_requests_source;
|
|
|
|
gchar *referer;
|
|
gchar *user_agent;
|
|
gchar **cookies;
|
|
};
|
|
|
|
struct DownloadHelperTransfer
|
|
{
|
|
DownloadHelper *dh;
|
|
|
|
gboolean blocking;
|
|
gboolean complete;
|
|
gboolean progress_pending;
|
|
|
|
GCond cond;
|
|
|
|
GCancellable *cancellable;
|
|
|
|
SoupMessage *msg;
|
|
gboolean request_sent;
|
|
|
|
/* Current read buffer */
|
|
char *read_buffer;
|
|
guint64 read_buffer_size;
|
|
guint64 read_position; /* Start in bytes of the read_buffer */
|
|
|
|
DownloadRequest *request;
|
|
};
|
|
|
|
static void
|
|
free_transfer (DownloadHelperTransfer * transfer)
|
|
{
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
if (request)
|
|
download_request_unref (request);
|
|
|
|
if (transfer->blocking)
|
|
g_cond_clear (&transfer->cond);
|
|
|
|
g_object_unref (transfer->msg);
|
|
g_free (transfer->read_buffer);
|
|
g_free (transfer);
|
|
}
|
|
|
|
static void
|
|
transfer_completion_cb (gpointer src_object, GAsyncResult * res,
|
|
gpointer user_data)
|
|
{
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data ((GTask *) res);
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
if (transfer->blocking)
|
|
return; /* Somehow a completion got signalled for a blocking request */
|
|
|
|
download_request_lock (request);
|
|
request->in_use = FALSE;
|
|
GST_LOG ("Despatching completion for transfer %p request %p", transfer,
|
|
request);
|
|
download_request_despatch_completion (request);
|
|
download_request_unlock (request);
|
|
}
|
|
|
|
static gboolean
|
|
transfer_report_progress_cb (gpointer task)
|
|
{
|
|
DownloadHelperTransfer *transfer;
|
|
DownloadRequest *request;
|
|
|
|
/* Already completed - late callback */
|
|
if (g_task_get_completed (task))
|
|
return FALSE;
|
|
|
|
transfer = g_task_get_task_data (task);
|
|
request = transfer->request;
|
|
|
|
download_request_lock (request);
|
|
if (request->send_progress) {
|
|
GST_LOG ("Despatching progress for transfer %p request %p", transfer,
|
|
request);
|
|
download_request_despatch_progresss (request);
|
|
}
|
|
transfer->progress_pending = FALSE;
|
|
download_request_unlock (request);
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
static GTask *
|
|
transfer_task_new (DownloadHelper * dh, DownloadRequest * request,
|
|
SoupMessage * msg, gboolean blocking)
|
|
{
|
|
GTask *transfer_task = NULL;
|
|
DownloadHelperTransfer *transfer = g_new0 (DownloadHelperTransfer, 1);
|
|
|
|
transfer->blocking = blocking;
|
|
if (transfer->blocking)
|
|
g_cond_init (&transfer->cond);
|
|
|
|
transfer->cancellable = g_cancellable_new ();
|
|
transfer->request = download_request_ref (request);
|
|
|
|
transfer->dh = dh;
|
|
transfer->msg = msg;
|
|
|
|
transfer_task =
|
|
g_task_new (NULL, transfer->cancellable,
|
|
(GAsyncReadyCallback) transfer_completion_cb, NULL);
|
|
g_task_set_task_data (transfer_task, transfer,
|
|
(GDestroyNotify) free_transfer);
|
|
|
|
return transfer_task;
|
|
}
|
|
|
|
static void
|
|
release_transfer_task_by_ref (GTask ** transfer_task)
|
|
{
|
|
g_object_unref (*transfer_task);
|
|
}
|
|
|
|
/* Called with download_request lock held */
|
|
static void
|
|
transfer_task_report_progress (GTask * transfer_task)
|
|
{
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
DownloadRequest *request = transfer->request;
|
|
GSource *idle_source;
|
|
|
|
if (transfer->progress_pending == TRUE || !request->send_progress)
|
|
return;
|
|
|
|
/* There's no progress cb pending and this download wants reports, so
|
|
* attach an idle source */
|
|
transfer->progress_pending = TRUE;
|
|
idle_source = g_idle_source_new ();
|
|
g_task_attach_source (transfer_task, idle_source,
|
|
transfer_report_progress_cb);
|
|
g_source_unref (idle_source);
|
|
}
|
|
|
|
static void
|
|
finish_transfer_task (DownloadHelper * dh, GTask * transfer_task,
|
|
GError * error)
|
|
{
|
|
int i;
|
|
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
for (i = dh->active_transfers->len - 1; i >= 0; i--) {
|
|
if (transfer_task == g_array_index (dh->active_transfers, GTask *, i)) {
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
|
|
transfer->complete = TRUE;
|
|
|
|
if (transfer->blocking)
|
|
g_cond_broadcast (&transfer->cond);
|
|
else if (error != NULL)
|
|
g_task_return_error (transfer_task, error);
|
|
else
|
|
g_task_return_boolean (transfer_task, TRUE);
|
|
|
|
/* This drops the task ref: */
|
|
g_array_remove_index_fast (dh->active_transfers, i);
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
return;
|
|
}
|
|
}
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
GST_WARNING ("Did not find transfer %p in the active transfer list",
|
|
transfer_task);
|
|
}
|
|
|
|
static gboolean
|
|
new_read_buffer (DownloadHelperTransfer * transfer)
|
|
{
|
|
gint buffer_size = CHUNK_BUFFER_SIZE;
|
|
#if 0
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
if (request->range_end != -1) {
|
|
if (request->range_end <= transfer->read_position) {
|
|
transfer->read_buffer = NULL;
|
|
transfer->read_buffer_size = 0;
|
|
return FALSE;
|
|
}
|
|
if (request->range_end - transfer->read_position < buffer_size)
|
|
buffer_size = request->range_end - transfer->read_position + 1;
|
|
}
|
|
#endif
|
|
|
|
transfer->read_buffer = g_new (char, buffer_size);
|
|
transfer->read_buffer_size = buffer_size;
|
|
return TRUE;
|
|
}
|
|
|
|
static void
|
|
on_read_ready (GObject * source, GAsyncResult * result, gpointer user_data)
|
|
{
|
|
GTask *transfer_task = user_data;
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
|
|
DownloadHelper *dh = transfer->dh;
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
GInputStream *in = G_INPUT_STREAM (source);
|
|
GError *error = NULL;
|
|
gsize bytes_read = 0;
|
|
|
|
GstClockTime now = gst_adaptive_demux_clock_get_time (dh->clock);
|
|
|
|
g_input_stream_read_all_finish (in, result, &bytes_read, &error);
|
|
|
|
download_request_lock (request);
|
|
|
|
if (error) {
|
|
g_free (transfer->read_buffer);
|
|
transfer->read_buffer = NULL;
|
|
|
|
if (!g_cancellable_is_cancelled (transfer->cancellable)) {
|
|
GST_ERROR ("Failed to read stream: %s", error->message);
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
|
|
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
|
|
finish_transfer_task (dh, transfer_task, error);
|
|
} else {
|
|
/* Ignore error from cancelled operation */
|
|
g_error_free (error);
|
|
finish_transfer_task (dh, transfer_task, NULL);
|
|
}
|
|
download_request_unlock (request);
|
|
|
|
return;
|
|
}
|
|
|
|
if (!bytes_read) {
|
|
goto finish_transfer;
|
|
} else {
|
|
GstBuffer *gst_buffer =
|
|
gst_buffer_new_wrapped (transfer->read_buffer, bytes_read);
|
|
|
|
GST_BUFFER_OFFSET (gst_buffer) = transfer->read_position;
|
|
transfer->read_position += bytes_read;
|
|
transfer->read_buffer = NULL;
|
|
|
|
/* Clip the buffer to within the range */
|
|
if (GST_BUFFER_OFFSET (gst_buffer) < request->range_start) {
|
|
if (transfer->read_position <= request->range_start) {
|
|
GST_DEBUG ("Discarding %" G_GSIZE_FORMAT
|
|
" bytes entirely before requested range",
|
|
gst_buffer_get_size (gst_buffer));
|
|
/* This buffer is completely before the range start, discard it */
|
|
gst_buffer_unref (gst_buffer);
|
|
gst_buffer = NULL;
|
|
} else {
|
|
GST_DEBUG ("Clipping first %" G_GINT64_FORMAT
|
|
" bytes before requested range",
|
|
request->range_start - GST_BUFFER_OFFSET (gst_buffer));
|
|
|
|
/* This buffer is partially within the requested range, clip the beginning */
|
|
gst_buffer_resize (gst_buffer,
|
|
request->range_start - GST_BUFFER_OFFSET (gst_buffer), -1);
|
|
GST_BUFFER_OFFSET (gst_buffer) = request->range_start;
|
|
}
|
|
}
|
|
|
|
if (request->download_start_time == GST_CLOCK_TIME_NONE) {
|
|
GST_LOG ("Got first data for URI %s", request->uri);
|
|
request->download_start_time = now;
|
|
}
|
|
|
|
if (gst_buffer != NULL) {
|
|
/* Unsent means cancellation is in progress, so don't override
|
|
* the state. Otherwise make sure it is LOADING */
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
|
|
request->state = DOWNLOAD_REQUEST_STATE_LOADING;
|
|
|
|
GST_LOG ("Adding %u bytes to buffer",
|
|
(guint) (gst_buffer_get_size (gst_buffer)));
|
|
|
|
download_request_add_buffer (request, gst_buffer);
|
|
|
|
transfer_task_report_progress (transfer_task);
|
|
}
|
|
|
|
/* Resubmit the read request to get more */
|
|
if (!new_read_buffer (transfer))
|
|
goto finish_transfer;
|
|
|
|
g_main_context_push_thread_default (dh->transfer_context);
|
|
g_input_stream_read_all_async (in, transfer->read_buffer,
|
|
transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable,
|
|
on_read_ready, transfer_task);
|
|
g_main_context_pop_thread_default (dh->transfer_context);
|
|
}
|
|
|
|
download_request_unlock (request);
|
|
return;
|
|
|
|
finish_transfer:
|
|
if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) {
|
|
SoupStatus status_code = _soup_message_get_status (transfer->msg);
|
|
|
|
GST_LOG ("request complete. Code %d URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT, status_code, request->uri,
|
|
request->range_start, request->range_end);
|
|
|
|
if (SOUP_STATUS_IS_SUCCESSFUL (status_code)
|
|
|| SOUP_STATUS_IS_REDIRECTION (status_code))
|
|
request->state = DOWNLOAD_REQUEST_STATE_COMPLETE;
|
|
else
|
|
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
|
|
}
|
|
request->download_end_time = now;
|
|
|
|
g_free (transfer->read_buffer);
|
|
transfer->read_buffer = NULL;
|
|
|
|
download_request_unlock (request);
|
|
|
|
finish_transfer_task (dh, transfer_task, NULL);
|
|
}
|
|
|
|
static void
|
|
http_header_to_structure (const gchar * name, const gchar * value,
|
|
gpointer user_data)
|
|
{
|
|
GstStructure *headers = user_data;
|
|
const GValue *gv;
|
|
|
|
if (!g_utf8_validate (name, -1, NULL) || !g_utf8_validate (value, -1, NULL))
|
|
return;
|
|
|
|
gv = gst_structure_get_value (headers, name);
|
|
if (gv && GST_VALUE_HOLDS_ARRAY (gv)) {
|
|
GValue v = G_VALUE_INIT;
|
|
|
|
g_value_init (&v, G_TYPE_STRING);
|
|
g_value_set_string (&v, value);
|
|
gst_value_array_append_value ((GValue *) gv, &v);
|
|
g_value_unset (&v);
|
|
} else if (gv && G_VALUE_HOLDS_STRING (gv)) {
|
|
GValue arr = G_VALUE_INIT;
|
|
GValue v = G_VALUE_INIT;
|
|
const gchar *old_value = g_value_get_string (gv);
|
|
|
|
g_value_init (&arr, GST_TYPE_ARRAY);
|
|
g_value_init (&v, G_TYPE_STRING);
|
|
g_value_set_string (&v, old_value);
|
|
gst_value_array_append_value (&arr, &v);
|
|
g_value_set_string (&v, value);
|
|
gst_value_array_append_value (&arr, &v);
|
|
|
|
gst_structure_set_value (headers, name, &arr);
|
|
g_value_unset (&v);
|
|
g_value_unset (&arr);
|
|
} else {
|
|
gst_structure_set (headers, name, G_TYPE_STRING, value, NULL);
|
|
}
|
|
}
|
|
|
|
static void
|
|
soup_msg_restarted_cb (SoupMessage * msg, gpointer user_data)
|
|
{
|
|
GTask *transfer_task = user_data;
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
DownloadRequest *request = transfer->request;
|
|
SoupStatus status = _soup_message_get_status (msg);
|
|
|
|
if (SOUP_STATUS_IS_REDIRECTION (status)) {
|
|
char *redirect_uri = gst_soup_message_uri_to_string (msg);
|
|
gboolean redirect_permanent = (status == SOUP_STATUS_MOVED_PERMANENTLY);
|
|
|
|
GST_DEBUG ("%u redirect to \"%s\" (permanent %d)",
|
|
status, redirect_uri, redirect_permanent);
|
|
|
|
download_request_lock (request);
|
|
g_free (request->redirect_uri);
|
|
request->redirect_uri = redirect_uri;
|
|
request->redirect_permanent = redirect_permanent;
|
|
download_request_unlock (request);
|
|
}
|
|
}
|
|
|
|
static GstStructure *
|
|
handle_response_headers (DownloadHelperTransfer * transfer)
|
|
{
|
|
DownloadRequest *request = transfer->request;
|
|
SoupMessage *msg = transfer->msg;
|
|
SoupMessageHeaders *response_headers;
|
|
GstStructure *http_headers, *headers;
|
|
|
|
http_headers = gst_structure_new_empty ("http-headers");
|
|
|
|
#if 0
|
|
if (msg->status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED &&
|
|
src->proxy_id && src->proxy_pw) {
|
|
/* wait for authenticate callback */
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
if (src->redirection_uri)
|
|
gst_structure_set (http_headers, "redirection-uri", G_TYPE_STRING,
|
|
src->redirection_uri, NULL);
|
|
#endif
|
|
|
|
headers = gst_structure_new_empty ("request-headers");
|
|
_soup_message_headers_foreach (_soup_message_get_request_headers (msg),
|
|
http_header_to_structure, headers);
|
|
gst_structure_set (http_headers, "request-headers", GST_TYPE_STRUCTURE,
|
|
headers, NULL);
|
|
gst_structure_free (headers);
|
|
headers = gst_structure_new_empty ("response-headers");
|
|
response_headers = _soup_message_get_response_headers (msg);
|
|
_soup_message_headers_foreach (response_headers, http_header_to_structure,
|
|
headers);
|
|
gst_structure_set (http_headers, "response-headers", GST_TYPE_STRUCTURE,
|
|
headers, NULL);
|
|
gst_structure_free (headers);
|
|
|
|
#if 0
|
|
if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
|
|
/* force an error */
|
|
gst_structure_free (http_headers);
|
|
return gst_soup_http_src_parse_status (msg, src);
|
|
}
|
|
#endif
|
|
|
|
/* Parse Content-Length. */
|
|
if (SOUP_STATUS_IS_SUCCESSFUL (_soup_message_get_status (msg)) &&
|
|
(_soup_message_headers_get_encoding (response_headers) ==
|
|
SOUP_ENCODING_CONTENT_LENGTH)) {
|
|
request->content_length =
|
|
_soup_message_headers_get_content_length (response_headers);
|
|
}
|
|
/* Parse Content-Range in a partial content response to set our initial read_position */
|
|
transfer->read_position = 0;
|
|
if (_soup_message_get_status (msg) == SOUP_STATUS_PARTIAL_CONTENT) {
|
|
goffset start, end;
|
|
if (_soup_message_headers_get_content_range (response_headers, &start,
|
|
&end, NULL)) {
|
|
GST_DEBUG ("Content-Range response %" G_GOFFSET_FORMAT "-%"
|
|
G_GOFFSET_FORMAT, start, end);
|
|
transfer->read_position = start;
|
|
}
|
|
}
|
|
if (transfer->read_position != request->range_start) {
|
|
GST_WARNING ("Server did not respect our range request for range %"
|
|
G_GINT64_FORMAT " to %" G_GINT64_FORMAT " - starting at offset %"
|
|
G_GUINT64_FORMAT, request->range_start, request->range_end,
|
|
transfer->read_position);
|
|
}
|
|
|
|
return http_headers;
|
|
}
|
|
|
|
static void
|
|
on_request_sent (GObject * source, GAsyncResult * result, gpointer user_data)
|
|
{
|
|
GTask *transfer_task = user_data;
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
|
|
DownloadHelper *dh = transfer->dh;
|
|
DownloadRequest *request = transfer->request;
|
|
SoupMessage *msg = transfer->msg;
|
|
GError *error = NULL;
|
|
|
|
GInputStream *in =
|
|
_soup_session_send_finish ((SoupSession *) source, result, &error);
|
|
|
|
download_request_lock (request);
|
|
|
|
if (in == NULL) {
|
|
request->status_code = _soup_message_get_status (msg);
|
|
|
|
if (!g_cancellable_is_cancelled (transfer->cancellable)) {
|
|
GST_LOG ("request errored. Code %d URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT, request->status_code, request->uri,
|
|
request->range_start, request->range_end);
|
|
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
|
|
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
|
|
finish_transfer_task (dh, transfer_task, error);
|
|
} else {
|
|
/* Ignore error from cancelled operation */
|
|
g_error_free (error);
|
|
finish_transfer_task (dh, transfer_task, NULL);
|
|
}
|
|
download_request_unlock (request);
|
|
|
|
/* No async callback queued - the transfer is done */
|
|
finish_transfer_task (dh, transfer_task, error);
|
|
return;
|
|
}
|
|
|
|
/* If the state went back to UNSENT, we were cancelled so don't override it */
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT &&
|
|
request->state != DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED) {
|
|
|
|
request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
|
|
request->status_code = _soup_message_get_status (msg);
|
|
request->headers = handle_response_headers (transfer);
|
|
|
|
if (SOUP_STATUS_IS_SUCCESSFUL (request->status_code)
|
|
|| SOUP_STATUS_IS_REDIRECTION (request->status_code)) {
|
|
request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
|
|
transfer_task_report_progress (transfer_task);
|
|
} else {
|
|
goto finish_transfer_error;
|
|
}
|
|
}
|
|
|
|
if (!new_read_buffer (transfer))
|
|
goto finish_transfer_error;
|
|
|
|
download_request_unlock (request);
|
|
|
|
g_main_context_push_thread_default (dh->transfer_context);
|
|
g_input_stream_read_all_async (in, transfer->read_buffer,
|
|
transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable,
|
|
on_read_ready, transfer_task);
|
|
g_main_context_pop_thread_default (dh->transfer_context);
|
|
|
|
g_object_unref (in);
|
|
return;
|
|
|
|
finish_transfer_error:
|
|
request->download_end_time = gst_adaptive_demux_clock_get_time (dh->clock);
|
|
|
|
if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) {
|
|
GST_LOG ("request complete. Code %d URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT, _soup_message_get_status (msg), request->uri,
|
|
request->range_start, request->range_end);
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
|
|
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
|
|
}
|
|
|
|
g_free (transfer->read_buffer);
|
|
transfer->read_buffer = NULL;
|
|
|
|
download_request_unlock (request);
|
|
finish_transfer_task (dh, transfer_task, NULL);
|
|
g_object_unref (in);
|
|
}
|
|
|
|
DownloadHelper *
|
|
downloadhelper_new (GstAdaptiveDemuxClock * clock)
|
|
{
|
|
DownloadHelper *dh = g_new0 (DownloadHelper, 1);
|
|
|
|
dh->transfer_context = g_main_context_new ();
|
|
dh->loop = g_main_loop_new (dh->transfer_context, FALSE);
|
|
|
|
dh->clock = gst_adaptive_demux_clock_ref (clock);
|
|
|
|
g_mutex_init (&dh->transfer_lock);
|
|
dh->active_transfers = g_array_new (FALSE, FALSE, sizeof (GTask *));
|
|
|
|
g_array_set_clear_func (dh->active_transfers,
|
|
(GDestroyNotify) (release_transfer_task_by_ref));
|
|
|
|
dh->transfer_requests =
|
|
g_async_queue_new_full ((GDestroyNotify) g_object_unref);
|
|
dh->transfer_requests_source = NULL;
|
|
|
|
/* libsoup 3.0 (not 2.74 or 3.1) dispatches using a single source attached
|
|
* when the session is created, so we need to ensure it matches here. */
|
|
g_main_context_push_thread_default (dh->transfer_context);
|
|
|
|
/* Set 10 second timeout. Any longer is likely
|
|
* an attempt to reuse an already closed connection */
|
|
dh->session = _soup_session_new_with_options ("timeout", 10, NULL);
|
|
|
|
g_main_context_pop_thread_default (dh->transfer_context);
|
|
|
|
return dh;
|
|
}
|
|
|
|
void
|
|
downloadhelper_free (DownloadHelper * dh)
|
|
{
|
|
downloadhelper_stop (dh);
|
|
|
|
if (dh->session)
|
|
g_object_unref (dh->session);
|
|
g_main_loop_unref (dh->loop);
|
|
g_main_context_unref (dh->transfer_context);
|
|
|
|
if (dh->clock)
|
|
gst_adaptive_demux_clock_unref (dh->clock);
|
|
|
|
g_array_free (dh->active_transfers, TRUE);
|
|
g_async_queue_unref (dh->transfer_requests);
|
|
|
|
g_free (dh->referer);
|
|
g_free (dh->user_agent);
|
|
g_strfreev (dh->cookies);
|
|
|
|
g_free (dh);
|
|
}
|
|
|
|
void
|
|
downloadhelper_set_referer (DownloadHelper * dh, const gchar * referer)
|
|
{
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
g_free (dh->referer);
|
|
dh->referer = g_strdup (referer);
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
}
|
|
|
|
void
|
|
downloadhelper_set_user_agent (DownloadHelper * dh, const gchar * user_agent)
|
|
{
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
g_free (dh->user_agent);
|
|
dh->user_agent = g_strdup (user_agent);
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
}
|
|
|
|
/* Takes ownership of the strv */
|
|
void
|
|
downloadhelper_set_cookies (DownloadHelper * dh, gchar ** cookies)
|
|
{
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
g_strfreev (dh->cookies);
|
|
dh->cookies = cookies;
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
}
|
|
|
|
/* Called with the transfer lock held */
|
|
static void
|
|
submit_transfer (DownloadHelper * dh, GTask * transfer_task)
|
|
{
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
download_request_lock (request);
|
|
request->state = DOWNLOAD_REQUEST_STATE_OPEN;
|
|
request->download_request_time =
|
|
gst_adaptive_demux_clock_get_time (dh->clock);
|
|
|
|
GST_LOG ("Submitting request URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT, request->uri, request->range_start, request->range_end);
|
|
|
|
transfer_task_report_progress (transfer_task);
|
|
download_request_unlock (request);
|
|
|
|
_soup_session_send_async (dh->session, transfer->msg, transfer->cancellable,
|
|
on_request_sent, transfer_task);
|
|
g_array_append_val (dh->active_transfers, transfer_task);
|
|
}
|
|
|
|
/* Idle callback that submits all pending transfers */
|
|
static gboolean
|
|
submit_transfers_cb (DownloadHelper * dh)
|
|
{
|
|
GTask *transfer;
|
|
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
do {
|
|
transfer = g_async_queue_try_pop (dh->transfer_requests);
|
|
if (transfer) {
|
|
submit_transfer (dh, transfer);
|
|
}
|
|
} while (transfer != NULL);
|
|
|
|
/* FIXME: Use a PollFD like GWakeup instead? */
|
|
g_source_destroy (dh->transfer_requests_source);
|
|
g_source_unref (dh->transfer_requests_source);
|
|
dh->transfer_requests_source = NULL;
|
|
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
return G_SOURCE_REMOVE;
|
|
}
|
|
|
|
static gpointer
|
|
dh_transfer_thread_func (gpointer data)
|
|
{
|
|
DownloadHelper *dh = data;
|
|
GST_DEBUG ("DownloadHelper thread starting");
|
|
|
|
g_main_context_push_thread_default (dh->transfer_context);
|
|
g_main_loop_run (dh->loop);
|
|
g_main_context_pop_thread_default (dh->transfer_context);
|
|
|
|
GST_DEBUG ("Exiting DownloadHelper thread");
|
|
return NULL;
|
|
}
|
|
|
|
gboolean
|
|
downloadhelper_start (DownloadHelper * dh)
|
|
{
|
|
g_return_val_if_fail (dh->transfer_thread == NULL, FALSE);
|
|
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
if (!dh->running) {
|
|
|
|
dh->transfer_thread =
|
|
g_thread_try_new ("adaptive-download-task", dh_transfer_thread_func, dh,
|
|
NULL);
|
|
dh->running = (dh->transfer_thread != NULL);
|
|
}
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
return dh->running;
|
|
}
|
|
|
|
void
|
|
downloadhelper_stop (DownloadHelper * dh)
|
|
{
|
|
int i;
|
|
GThread *transfer_thread = NULL;
|
|
|
|
GST_DEBUG ("Stopping DownloadHelper loop");
|
|
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
|
|
dh->running = FALSE;
|
|
|
|
for (i = 0; i < dh->active_transfers->len; i++) {
|
|
GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
g_cancellable_cancel (transfer->cancellable);
|
|
}
|
|
|
|
g_main_loop_quit (dh->loop);
|
|
|
|
transfer_thread = dh->transfer_thread;
|
|
dh->transfer_thread = NULL;
|
|
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
if (transfer_thread != NULL) {
|
|
g_thread_join (transfer_thread);
|
|
}
|
|
|
|
/* The transfer thread has exited at this point - any remaining transfers are unfinished
|
|
* and need cleaning up */
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
|
|
for (i = 0; i < dh->active_transfers->len; i++) {
|
|
GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
download_request_lock (request);
|
|
/* Reset the state to UNSENT, to indicate cancellation, like an XMLHttpRequest does */
|
|
request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
|
|
download_request_unlock (request);
|
|
|
|
transfer->complete = TRUE;
|
|
if (transfer->blocking)
|
|
g_cond_broadcast (&transfer->cond);
|
|
else
|
|
g_task_return_boolean (transfer_task, TRUE);
|
|
}
|
|
|
|
g_array_set_size (dh->active_transfers, 0);
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
}
|
|
|
|
gboolean
|
|
downloadhelper_submit_request (DownloadHelper * dh,
|
|
const gchar * referer, DownloadFlags flags, DownloadRequest * request,
|
|
GError ** err)
|
|
{
|
|
GTask *transfer_task = NULL;
|
|
const gchar *method;
|
|
SoupMessage *msg;
|
|
SoupMessageHeaders *msg_headers;
|
|
gboolean blocking = (flags & DOWNLOAD_FLAG_BLOCKING) != 0;
|
|
|
|
method =
|
|
(flags & DOWNLOAD_FLAG_HEADERS_ONLY) ? SOUP_METHOD_HEAD : SOUP_METHOD_GET;
|
|
|
|
download_request_lock (request);
|
|
if (request->in_use) {
|
|
GST_ERROR ("Request for URI %s reusing active request object",
|
|
request->uri);
|
|
download_request_unlock (request);
|
|
return FALSE;
|
|
}
|
|
|
|
/* Clear the state back to unsent */
|
|
request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
|
|
|
|
msg = _soup_message_new (method, request->uri);
|
|
if (msg == NULL) {
|
|
g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
|
|
"Could not parse download URI %s", request->uri);
|
|
|
|
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
|
|
download_request_unlock (request);
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
/* NOTE: There was a bug where Akamai servers return the
|
|
* wrong result for a range request on small files. To avoid
|
|
* it if the range starts within the first KB of the file, just
|
|
* start at 0 instead */
|
|
if (request->range_start < 1024)
|
|
request->range_start = 0;
|
|
|
|
msg_headers = _soup_message_get_request_headers (msg);
|
|
|
|
if (request->range_start != 0 || request->range_end != -1) {
|
|
_soup_message_headers_set_range (msg_headers, request->range_start,
|
|
request->range_end);
|
|
}
|
|
|
|
download_request_unlock (request);
|
|
|
|
/* If resubmitting a request, clear any stale / unused data */
|
|
download_request_begin_download (request);
|
|
|
|
if ((flags & DOWNLOAD_FLAG_COMPRESS) == 0) {
|
|
_soup_message_disable_feature (msg, _soup_content_decoder_get_type ());
|
|
}
|
|
if (flags & DOWNLOAD_FLAG_FORCE_REFRESH) {
|
|
_soup_message_headers_append (msg_headers, "Cache-Control", "max-age=0");
|
|
}
|
|
|
|
/* Take the lock to protect header strings */
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
|
|
if (referer != NULL) {
|
|
_soup_message_headers_append (msg_headers, "Referer", referer);
|
|
} else if (dh->referer != NULL) {
|
|
_soup_message_headers_append (msg_headers, "Referer", dh->referer);
|
|
}
|
|
|
|
if (dh->user_agent != NULL) {
|
|
_soup_message_headers_append (msg_headers, "User-Agent", dh->user_agent);
|
|
}
|
|
|
|
if (dh->cookies != NULL) {
|
|
gchar **cookie;
|
|
|
|
for (cookie = dh->cookies; *cookie != NULL; cookie++) {
|
|
_soup_message_headers_append (msg_headers, "Cookie", *cookie);
|
|
}
|
|
}
|
|
|
|
transfer_task = transfer_task_new (dh, request, msg, blocking);
|
|
|
|
if (!dh->running) {
|
|
/* The download helper was deactivated just as we went to dispatch this request.
|
|
* Abort and manually wake the request, as it never went in the active_transfer list */
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
download_request_lock (request);
|
|
request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
|
|
request->in_use = FALSE;
|
|
download_request_unlock (request);
|
|
|
|
g_cancellable_cancel (g_task_get_cancellable (transfer_task));
|
|
g_task_return_error_if_cancelled (transfer_task);
|
|
g_object_unref (transfer_task);
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
download_request_lock (request);
|
|
request->in_use = TRUE;
|
|
download_request_unlock (request);
|
|
|
|
g_signal_connect (msg, "restarted", G_CALLBACK (soup_msg_restarted_cb),
|
|
transfer_task);
|
|
|
|
/* Now send the request over to the main loop for actual submission */
|
|
GST_LOG ("Submitting transfer task %p", transfer_task);
|
|
g_async_queue_push (dh->transfer_requests, transfer_task);
|
|
|
|
/* No pending idle source to wake the transfer loop - so create one */
|
|
if (dh->transfer_requests_source == NULL) {
|
|
dh->transfer_requests_source = g_idle_source_new ();
|
|
g_source_set_callback (dh->transfer_requests_source,
|
|
(GSourceFunc) submit_transfers_cb, dh, NULL);
|
|
g_source_attach (dh->transfer_requests_source, dh->transfer_context);
|
|
}
|
|
|
|
if (blocking) {
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
|
|
/* We need an extra ref on the task to make sure it stays alive.
|
|
* We pushed it in the async queue, but didn't unlock yet, so while
|
|
* we gave away our ref, the receiver can't have unreffed it */
|
|
g_object_ref (transfer_task);
|
|
while (!transfer->complete)
|
|
g_cond_wait (&transfer->cond, &dh->transfer_lock);
|
|
g_object_unref (transfer_task);
|
|
}
|
|
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
void
|
|
downloadhelper_cancel_request (DownloadHelper * dh, DownloadRequest * request)
|
|
{
|
|
int i;
|
|
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
|
|
download_request_lock (request);
|
|
if (!request->in_use)
|
|
goto out;
|
|
|
|
GST_DEBUG ("Cancelling request for URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT, request->uri, request->range_start, request->range_end);
|
|
|
|
request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
|
|
|
|
for (i = dh->active_transfers->len - 1; i >= 0; i--) {
|
|
GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
|
|
if (transfer->request == request) {
|
|
GST_DEBUG ("Found transfer %p for request for URI %s range %"
|
|
G_GINT64_FORMAT " %" G_GINT64_FORMAT, transfer, request->uri,
|
|
request->range_start, request->range_end);
|
|
g_cancellable_cancel (transfer->cancellable);
|
|
break;
|
|
}
|
|
}
|
|
|
|
out:
|
|
download_request_unlock (request);
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
}
|
|
|
|
DownloadRequest *
|
|
downloadhelper_fetch_uri_range (DownloadHelper * dh, const gchar * uri,
|
|
const gchar * referer, DownloadFlags flags, gint64 range_start,
|
|
gint64 range_end, GError ** err)
|
|
{
|
|
DownloadRequest *request;
|
|
|
|
g_return_val_if_fail (uri != NULL, NULL);
|
|
|
|
GST_DEBUG ("Fetching URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT,
|
|
uri, range_start, range_end);
|
|
|
|
flags |= DOWNLOAD_FLAG_BLOCKING;
|
|
|
|
request = download_request_new_uri_range (uri, range_start, range_end);
|
|
|
|
if (!downloadhelper_submit_request (dh, referer, flags, request, err)) {
|
|
download_request_unref (request);
|
|
return NULL;
|
|
}
|
|
|
|
return request;
|
|
}
|
|
|
|
DownloadRequest *
|
|
downloadhelper_fetch_uri (DownloadHelper * dh, const gchar * uri,
|
|
const gchar * referer, DownloadFlags flags, GError ** err)
|
|
{
|
|
return downloadhelper_fetch_uri_range (dh, uri, referer, flags, 0, -1, err);
|
|
}
|