mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-27 12:11:13 +00:00
5192181ef5
Add a field to the DownloadRequest that reports the most recent time at which data arrived. Update it in the DownloadHelper. Add a method to retrieve the GST_BUFFER_OFFSET() for the DownloadRequest's data buffer (if any). Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3883>
1019 lines
30 KiB
C
1019 lines
30 KiB
C
/* GStreamer
|
|
* Copyright (C) 2021-2022 Jan Schmidt <jan@centricular.com>
|
|
*
|
|
* downloadhelper.c:
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Library General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Library General Public License for more details.
|
|
*
|
|
* Youshould have received a copy of the GNU Library General Public
|
|
* License along with this library; if not, write to the
|
|
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
* Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
#include "downloadhelper.h"
|
|
#include "../soup/gstsouploader.h"
|
|
|
|
GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
|
|
#define GST_CAT_DEFAULT adaptivedemux2_debug
|
|
|
|
#define CHUNK_BUFFER_SIZE 32768
|
|
|
|
typedef struct DownloadHelperTransfer DownloadHelperTransfer;
|
|
|
|
struct DownloadHelper
|
|
{
|
|
GThread *transfer_thread;
|
|
|
|
gboolean running;
|
|
|
|
GstAdaptiveDemuxClock *clock;
|
|
|
|
GMainContext *transfer_context;
|
|
GMainLoop *loop;
|
|
SoupSession *session;
|
|
|
|
GMutex transfer_lock;
|
|
GArray *active_transfers;
|
|
|
|
GAsyncQueue *transfer_requests;
|
|
GSource *transfer_requests_source;
|
|
|
|
gchar *referer;
|
|
gchar *user_agent;
|
|
gchar **cookies;
|
|
};
|
|
|
|
struct DownloadHelperTransfer
|
|
{
|
|
DownloadHelper *dh;
|
|
|
|
gboolean blocking;
|
|
gboolean complete;
|
|
gboolean progress_pending;
|
|
|
|
GCond cond;
|
|
|
|
GCancellable *cancellable;
|
|
|
|
SoupMessage *msg;
|
|
gboolean request_sent;
|
|
|
|
/* Current read buffer */
|
|
char *read_buffer;
|
|
guint64 read_buffer_size;
|
|
guint64 read_position; /* Start in bytes of the read_buffer */
|
|
|
|
DownloadRequest *request;
|
|
};
|
|
|
|
static void
|
|
free_transfer (DownloadHelperTransfer * transfer)
|
|
{
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
if (request)
|
|
download_request_unref (request);
|
|
|
|
if (transfer->blocking)
|
|
g_cond_clear (&transfer->cond);
|
|
|
|
g_object_unref (transfer->msg);
|
|
g_free (transfer->read_buffer);
|
|
g_free (transfer);
|
|
}
|
|
|
|
static void
|
|
transfer_completion_cb (gpointer src_object, GAsyncResult * res,
|
|
gpointer user_data)
|
|
{
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data ((GTask *) res);
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
if (transfer->blocking)
|
|
return; /* Somehow a completion got signalled for a blocking request */
|
|
|
|
download_request_lock (request);
|
|
request->in_use = FALSE;
|
|
GST_LOG ("Despatching completion for transfer %p request %p", transfer,
|
|
request);
|
|
download_request_despatch_completion (request);
|
|
download_request_unlock (request);
|
|
}
|
|
|
|
static gboolean
|
|
transfer_report_progress_cb (gpointer task)
|
|
{
|
|
DownloadHelperTransfer *transfer;
|
|
DownloadRequest *request;
|
|
|
|
/* Already completed - late callback */
|
|
if (g_task_get_completed (task))
|
|
return FALSE;
|
|
|
|
transfer = g_task_get_task_data (task);
|
|
request = transfer->request;
|
|
|
|
download_request_lock (request);
|
|
if (request->send_progress) {
|
|
GST_LOG ("Despatching progress for transfer %p request %p", transfer,
|
|
request);
|
|
download_request_despatch_progress (request);
|
|
}
|
|
transfer->progress_pending = FALSE;
|
|
download_request_unlock (request);
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
static GTask *
|
|
transfer_task_new (DownloadHelper * dh, DownloadRequest * request,
|
|
SoupMessage * msg, gboolean blocking)
|
|
{
|
|
GTask *transfer_task = NULL;
|
|
DownloadHelperTransfer *transfer = g_new0 (DownloadHelperTransfer, 1);
|
|
|
|
transfer->blocking = blocking;
|
|
if (transfer->blocking)
|
|
g_cond_init (&transfer->cond);
|
|
|
|
transfer->cancellable = g_cancellable_new ();
|
|
transfer->request = download_request_ref (request);
|
|
|
|
transfer->dh = dh;
|
|
transfer->msg = msg;
|
|
|
|
transfer_task =
|
|
g_task_new (NULL, transfer->cancellable,
|
|
(GAsyncReadyCallback) transfer_completion_cb, NULL);
|
|
g_task_set_task_data (transfer_task, transfer,
|
|
(GDestroyNotify) free_transfer);
|
|
|
|
return transfer_task;
|
|
}
|
|
|
|
static void
|
|
release_transfer_task_by_ref (GTask ** transfer_task)
|
|
{
|
|
g_object_unref (*transfer_task);
|
|
}
|
|
|
|
/* Called with download_request lock held */
|
|
static void
|
|
transfer_task_report_progress (GTask * transfer_task)
|
|
{
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
DownloadRequest *request = transfer->request;
|
|
GSource *idle_source;
|
|
|
|
if (transfer->progress_pending == TRUE || !request->send_progress)
|
|
return;
|
|
|
|
/* There's no progress cb pending and this download wants reports, so
|
|
* attach an idle source */
|
|
transfer->progress_pending = TRUE;
|
|
idle_source = g_idle_source_new ();
|
|
g_task_attach_source (transfer_task, idle_source,
|
|
transfer_report_progress_cb);
|
|
g_source_unref (idle_source);
|
|
}
|
|
|
|
static void
|
|
finish_transfer_task (DownloadHelper * dh, GTask * transfer_task,
|
|
GError * error)
|
|
{
|
|
int i;
|
|
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
for (i = dh->active_transfers->len - 1; i >= 0; i--) {
|
|
if (transfer_task == g_array_index (dh->active_transfers, GTask *, i)) {
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
|
|
transfer->complete = TRUE;
|
|
|
|
if (transfer->blocking)
|
|
g_cond_broadcast (&transfer->cond);
|
|
else if (error != NULL)
|
|
g_task_return_error (transfer_task, error);
|
|
else
|
|
g_task_return_boolean (transfer_task, TRUE);
|
|
|
|
/* This drops the task ref: */
|
|
g_array_remove_index_fast (dh->active_transfers, i);
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
return;
|
|
}
|
|
}
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
GST_WARNING ("Did not find transfer %p in the active transfer list",
|
|
transfer_task);
|
|
}
|
|
|
|
static gboolean
|
|
new_read_buffer (DownloadHelperTransfer * transfer)
|
|
{
|
|
gint buffer_size = CHUNK_BUFFER_SIZE;
|
|
#if 0
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
if (request->range_end != -1) {
|
|
if (request->range_end <= transfer->read_position) {
|
|
transfer->read_buffer = NULL;
|
|
transfer->read_buffer_size = 0;
|
|
return FALSE;
|
|
}
|
|
if (request->range_end - transfer->read_position < buffer_size)
|
|
buffer_size = request->range_end - transfer->read_position + 1;
|
|
}
|
|
#endif
|
|
|
|
transfer->read_buffer = g_new (char, buffer_size);
|
|
transfer->read_buffer_size = buffer_size;
|
|
return TRUE;
|
|
}
|
|
|
|
static void
|
|
on_read_ready (GObject * source, GAsyncResult * result, gpointer user_data)
|
|
{
|
|
GTask *transfer_task = user_data;
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
|
|
DownloadHelper *dh = transfer->dh;
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
GInputStream *in = G_INPUT_STREAM (source);
|
|
GError *error = NULL;
|
|
gsize bytes_read = 0;
|
|
|
|
GstClockTime now = gst_adaptive_demux_clock_get_time (dh->clock);
|
|
|
|
gboolean read_failed =
|
|
g_input_stream_read_all_finish (in, result, &bytes_read, &error);
|
|
|
|
download_request_lock (request);
|
|
|
|
if (error) {
|
|
g_free (transfer->read_buffer);
|
|
transfer->read_buffer = NULL;
|
|
|
|
if (!g_cancellable_is_cancelled (transfer->cancellable)) {
|
|
GST_ERROR ("Failed to read stream: %s", error->message);
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
|
|
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
|
|
finish_transfer_task (dh, transfer_task, error);
|
|
} else {
|
|
/* Ignore error from cancelled operation */
|
|
g_error_free (error);
|
|
finish_transfer_task (dh, transfer_task, NULL);
|
|
}
|
|
download_request_unlock (request);
|
|
|
|
return;
|
|
}
|
|
|
|
if (bytes_read > 0) {
|
|
GstBuffer *gst_buffer =
|
|
gst_buffer_new_wrapped (transfer->read_buffer, bytes_read);
|
|
|
|
GST_BUFFER_OFFSET (gst_buffer) = transfer->read_position;
|
|
transfer->read_position += bytes_read;
|
|
transfer->read_buffer = NULL;
|
|
|
|
/* Clip the buffer to within the range */
|
|
if (GST_BUFFER_OFFSET (gst_buffer) < request->range_start) {
|
|
if (transfer->read_position <= request->range_start) {
|
|
GST_DEBUG ("Discarding %" G_GSIZE_FORMAT
|
|
" bytes entirely before requested range",
|
|
gst_buffer_get_size (gst_buffer));
|
|
/* This buffer is completely before the range start, discard it */
|
|
gst_buffer_unref (gst_buffer);
|
|
gst_buffer = NULL;
|
|
} else {
|
|
GST_DEBUG ("Clipping first %" G_GINT64_FORMAT
|
|
" bytes before requested range",
|
|
request->range_start - GST_BUFFER_OFFSET (gst_buffer));
|
|
|
|
/* This buffer is partially within the requested range, clip the beginning */
|
|
gst_buffer_resize (gst_buffer,
|
|
request->range_start - GST_BUFFER_OFFSET (gst_buffer), -1);
|
|
GST_BUFFER_OFFSET (gst_buffer) = request->range_start;
|
|
}
|
|
}
|
|
|
|
if (gst_buffer != NULL) {
|
|
/* Unsent means cancellation is in progress, so don't override
|
|
* the state. Otherwise make sure it is LOADING */
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
|
|
request->state = DOWNLOAD_REQUEST_STATE_LOADING;
|
|
|
|
if (request->download_start_time == GST_CLOCK_TIME_NONE) {
|
|
GST_LOG ("Got first data for URI %s", request->uri);
|
|
request->download_start_time = now;
|
|
}
|
|
request->download_newest_data_time = now;
|
|
|
|
GST_LOG ("Adding %u bytes to buffer (request URI %s)",
|
|
(guint) (gst_buffer_get_size (gst_buffer)), request->uri);
|
|
|
|
download_request_add_buffer (request, gst_buffer);
|
|
|
|
transfer_task_report_progress (transfer_task);
|
|
}
|
|
} else if (read_failed) {
|
|
/* The read failed and returned 0 bytes: We're done */
|
|
goto finish_transfer;
|
|
}
|
|
|
|
/* Resubmit the read request to get more */
|
|
if (!new_read_buffer (transfer))
|
|
goto finish_transfer;
|
|
|
|
g_main_context_push_thread_default (dh->transfer_context);
|
|
g_input_stream_read_all_async (in, transfer->read_buffer,
|
|
transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable,
|
|
on_read_ready, transfer_task);
|
|
g_main_context_pop_thread_default (dh->transfer_context);
|
|
|
|
download_request_unlock (request);
|
|
return;
|
|
|
|
finish_transfer:
|
|
if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) {
|
|
SoupStatus status_code = _soup_message_get_status (transfer->msg);
|
|
|
|
GST_LOG ("request complete. Code %d URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT, status_code, request->uri,
|
|
request->range_start, request->range_end);
|
|
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT) {
|
|
if (SOUP_STATUS_IS_SUCCESSFUL (status_code)
|
|
|| SOUP_STATUS_IS_REDIRECTION (status_code))
|
|
request->state = DOWNLOAD_REQUEST_STATE_COMPLETE;
|
|
else
|
|
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
|
|
}
|
|
}
|
|
request->download_end_time = now;
|
|
|
|
g_free (transfer->read_buffer);
|
|
transfer->read_buffer = NULL;
|
|
|
|
download_request_unlock (request);
|
|
|
|
finish_transfer_task (dh, transfer_task, NULL);
|
|
}
|
|
|
|
static void
|
|
http_header_to_structure (const gchar * name, const gchar * value,
|
|
gpointer user_data)
|
|
{
|
|
GstStructure *headers = user_data;
|
|
const GValue *gv;
|
|
|
|
if (!g_utf8_validate (name, -1, NULL) || !g_utf8_validate (value, -1, NULL))
|
|
return;
|
|
|
|
gv = gst_structure_get_value (headers, name);
|
|
if (gv && GST_VALUE_HOLDS_ARRAY (gv)) {
|
|
GValue v = G_VALUE_INIT;
|
|
|
|
g_value_init (&v, G_TYPE_STRING);
|
|
g_value_set_string (&v, value);
|
|
gst_value_array_append_value ((GValue *) gv, &v);
|
|
g_value_unset (&v);
|
|
} else if (gv && G_VALUE_HOLDS_STRING (gv)) {
|
|
GValue arr = G_VALUE_INIT;
|
|
GValue v = G_VALUE_INIT;
|
|
const gchar *old_value = g_value_get_string (gv);
|
|
|
|
g_value_init (&arr, GST_TYPE_ARRAY);
|
|
g_value_init (&v, G_TYPE_STRING);
|
|
g_value_set_string (&v, old_value);
|
|
gst_value_array_append_value (&arr, &v);
|
|
g_value_set_string (&v, value);
|
|
gst_value_array_append_value (&arr, &v);
|
|
|
|
gst_structure_set_value (headers, name, &arr);
|
|
g_value_unset (&v);
|
|
g_value_unset (&arr);
|
|
} else {
|
|
gst_structure_set (headers, name, G_TYPE_STRING, value, NULL);
|
|
}
|
|
}
|
|
|
|
static void
|
|
soup_msg_restarted_cb (SoupMessage * msg, gpointer user_data)
|
|
{
|
|
GTask *transfer_task = user_data;
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
DownloadRequest *request = transfer->request;
|
|
SoupStatus status = _soup_message_get_status (msg);
|
|
|
|
if (SOUP_STATUS_IS_REDIRECTION (status)) {
|
|
char *redirect_uri = gst_soup_message_uri_to_string (msg);
|
|
gboolean redirect_permanent = (status == SOUP_STATUS_MOVED_PERMANENTLY);
|
|
|
|
GST_DEBUG ("%u redirect to \"%s\" (permanent %d)",
|
|
status, redirect_uri, redirect_permanent);
|
|
|
|
download_request_lock (request);
|
|
g_free (request->redirect_uri);
|
|
request->redirect_uri = redirect_uri;
|
|
request->redirect_permanent = redirect_permanent;
|
|
download_request_unlock (request);
|
|
}
|
|
}
|
|
|
|
static GstStructure *
|
|
handle_response_headers (DownloadHelperTransfer * transfer)
|
|
{
|
|
DownloadRequest *request = transfer->request;
|
|
SoupMessage *msg = transfer->msg;
|
|
SoupMessageHeaders *response_headers;
|
|
GstStructure *http_headers, *headers;
|
|
|
|
http_headers = gst_structure_new_empty ("http-headers");
|
|
|
|
#if 0
|
|
if (msg->status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED &&
|
|
src->proxy_id && src->proxy_pw) {
|
|
/* wait for authenticate callback */
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
if (src->redirection_uri)
|
|
gst_structure_set (http_headers, "redirection-uri", G_TYPE_STRING,
|
|
src->redirection_uri, NULL);
|
|
#endif
|
|
|
|
headers = gst_structure_new_empty ("request-headers");
|
|
_soup_message_headers_foreach (_soup_message_get_request_headers (msg),
|
|
http_header_to_structure, headers);
|
|
gst_structure_set (http_headers, "request-headers", GST_TYPE_STRUCTURE,
|
|
headers, NULL);
|
|
gst_structure_free (headers);
|
|
headers = gst_structure_new_empty ("response-headers");
|
|
response_headers = _soup_message_get_response_headers (msg);
|
|
_soup_message_headers_foreach (response_headers, http_header_to_structure,
|
|
headers);
|
|
gst_structure_set (http_headers, "response-headers", GST_TYPE_STRUCTURE,
|
|
headers, NULL);
|
|
gst_structure_free (headers);
|
|
|
|
#if 0
|
|
if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
|
|
/* force an error */
|
|
gst_structure_free (http_headers);
|
|
return gst_soup_http_src_parse_status (msg, src);
|
|
}
|
|
#endif
|
|
|
|
/* Parse Content-Length. */
|
|
if (SOUP_STATUS_IS_SUCCESSFUL (_soup_message_get_status (msg)) &&
|
|
(_soup_message_headers_get_encoding (response_headers) ==
|
|
SOUP_ENCODING_CONTENT_LENGTH)) {
|
|
request->content_length =
|
|
_soup_message_headers_get_content_length (response_headers);
|
|
}
|
|
/* Parse Content-Range in a partial content response to set our initial read_position */
|
|
transfer->read_position = 0;
|
|
if (_soup_message_get_status (msg) == SOUP_STATUS_PARTIAL_CONTENT) {
|
|
goffset start, end;
|
|
if (_soup_message_headers_get_content_range (response_headers, &start,
|
|
&end, NULL)) {
|
|
GST_DEBUG ("Content-Range response %" G_GOFFSET_FORMAT "-%"
|
|
G_GOFFSET_FORMAT, start, end);
|
|
transfer->read_position = start;
|
|
}
|
|
}
|
|
if (transfer->read_position != request->range_start) {
|
|
GST_WARNING ("Server did not respect our range request for range %"
|
|
G_GINT64_FORMAT " to %" G_GINT64_FORMAT " - starting at offset %"
|
|
G_GUINT64_FORMAT, request->range_start, request->range_end,
|
|
transfer->read_position);
|
|
}
|
|
|
|
return http_headers;
|
|
}
|
|
|
|
static void
|
|
on_request_sent (GObject * source, GAsyncResult * result, gpointer user_data)
|
|
{
|
|
GTask *transfer_task = user_data;
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
|
|
DownloadHelper *dh = transfer->dh;
|
|
DownloadRequest *request = transfer->request;
|
|
SoupMessage *msg = transfer->msg;
|
|
GError *error = NULL;
|
|
|
|
GInputStream *in =
|
|
_soup_session_send_finish ((SoupSession *) source, result, &error);
|
|
|
|
download_request_lock (request);
|
|
|
|
if (in == NULL) {
|
|
request->status_code = _soup_message_get_status (msg);
|
|
|
|
if (!g_cancellable_is_cancelled (transfer->cancellable)) {
|
|
GST_LOG ("request errored. Code %d URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT, request->status_code, request->uri,
|
|
request->range_start, request->range_end);
|
|
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
|
|
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
|
|
finish_transfer_task (dh, transfer_task, error);
|
|
} else {
|
|
/* Ignore error from cancelled operation */
|
|
g_error_free (error);
|
|
finish_transfer_task (dh, transfer_task, NULL);
|
|
}
|
|
download_request_unlock (request);
|
|
|
|
/* No async callback queued - the transfer is done */
|
|
finish_transfer_task (dh, transfer_task, error);
|
|
return;
|
|
}
|
|
|
|
/* If the state went back to UNSENT, we were cancelled so don't override it */
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT &&
|
|
request->state != DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED) {
|
|
|
|
request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
|
|
request->status_code = _soup_message_get_status (msg);
|
|
request->headers = handle_response_headers (transfer);
|
|
GST_TRACE ("request URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT " headers: %" GST_PTR_FORMAT,
|
|
request->uri, request->range_start, request->range_end,
|
|
request->headers);
|
|
|
|
if (SOUP_STATUS_IS_SUCCESSFUL (request->status_code)
|
|
|| SOUP_STATUS_IS_REDIRECTION (request->status_code)) {
|
|
request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
|
|
transfer_task_report_progress (transfer_task);
|
|
} else {
|
|
goto finish_transfer_error;
|
|
}
|
|
}
|
|
|
|
if (!new_read_buffer (transfer))
|
|
goto finish_transfer_error;
|
|
|
|
download_request_unlock (request);
|
|
|
|
g_main_context_push_thread_default (dh->transfer_context);
|
|
g_input_stream_read_all_async (in, transfer->read_buffer,
|
|
transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable,
|
|
on_read_ready, transfer_task);
|
|
g_main_context_pop_thread_default (dh->transfer_context);
|
|
|
|
g_object_unref (in);
|
|
return;
|
|
|
|
finish_transfer_error:
|
|
request->download_end_time = gst_adaptive_demux_clock_get_time (dh->clock);
|
|
|
|
if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) {
|
|
GST_LOG ("request complete. Code %d URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT, _soup_message_get_status (msg), request->uri,
|
|
request->range_start, request->range_end);
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
|
|
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
|
|
}
|
|
|
|
g_free (transfer->read_buffer);
|
|
transfer->read_buffer = NULL;
|
|
|
|
download_request_unlock (request);
|
|
finish_transfer_task (dh, transfer_task, NULL);
|
|
g_object_unref (in);
|
|
}
|
|
|
|
DownloadHelper *
|
|
downloadhelper_new (GstAdaptiveDemuxClock * clock)
|
|
{
|
|
DownloadHelper *dh = g_new0 (DownloadHelper, 1);
|
|
|
|
dh->transfer_context = g_main_context_new ();
|
|
dh->loop = g_main_loop_new (dh->transfer_context, FALSE);
|
|
|
|
dh->clock = gst_adaptive_demux_clock_ref (clock);
|
|
|
|
g_mutex_init (&dh->transfer_lock);
|
|
dh->active_transfers = g_array_new (FALSE, FALSE, sizeof (GTask *));
|
|
|
|
g_array_set_clear_func (dh->active_transfers,
|
|
(GDestroyNotify) (release_transfer_task_by_ref));
|
|
|
|
dh->transfer_requests =
|
|
g_async_queue_new_full ((GDestroyNotify) g_object_unref);
|
|
dh->transfer_requests_source = NULL;
|
|
|
|
/* libsoup 3.0 (not 2.74 or 3.1) dispatches using a single source attached
|
|
* when the session is created, so we need to ensure it matches here. */
|
|
g_main_context_push_thread_default (dh->transfer_context);
|
|
|
|
/* Set 10 second timeout. Any longer is likely
|
|
* an attempt to reuse an already closed connection */
|
|
dh->session = _soup_session_new_with_options ("timeout", 10, NULL);
|
|
|
|
g_main_context_pop_thread_default (dh->transfer_context);
|
|
|
|
return dh;
|
|
}
|
|
|
|
void
|
|
downloadhelper_free (DownloadHelper * dh)
|
|
{
|
|
downloadhelper_stop (dh);
|
|
|
|
if (dh->session)
|
|
g_object_unref (dh->session);
|
|
g_main_loop_unref (dh->loop);
|
|
g_main_context_unref (dh->transfer_context);
|
|
|
|
if (dh->clock)
|
|
gst_adaptive_demux_clock_unref (dh->clock);
|
|
|
|
g_array_free (dh->active_transfers, TRUE);
|
|
g_async_queue_unref (dh->transfer_requests);
|
|
|
|
g_free (dh->referer);
|
|
g_free (dh->user_agent);
|
|
g_strfreev (dh->cookies);
|
|
|
|
g_free (dh);
|
|
}
|
|
|
|
void
|
|
downloadhelper_set_referer (DownloadHelper * dh, const gchar * referer)
|
|
{
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
g_free (dh->referer);
|
|
dh->referer = g_strdup (referer);
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
}
|
|
|
|
void
|
|
downloadhelper_set_user_agent (DownloadHelper * dh, const gchar * user_agent)
|
|
{
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
g_free (dh->user_agent);
|
|
dh->user_agent = g_strdup (user_agent);
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
}
|
|
|
|
/* Takes ownership of the strv */
|
|
void
|
|
downloadhelper_set_cookies (DownloadHelper * dh, gchar ** cookies)
|
|
{
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
g_strfreev (dh->cookies);
|
|
dh->cookies = cookies;
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
}
|
|
|
|
/* Called with the transfer lock held */
|
|
static void
|
|
submit_transfer (DownloadHelper * dh, GTask * transfer_task)
|
|
{
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
download_request_lock (request);
|
|
request->state = DOWNLOAD_REQUEST_STATE_OPEN;
|
|
request->download_request_time =
|
|
gst_adaptive_demux_clock_get_time (dh->clock);
|
|
|
|
GST_LOG ("Submitting request URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT, request->uri, request->range_start, request->range_end);
|
|
|
|
transfer_task_report_progress (transfer_task);
|
|
download_request_unlock (request);
|
|
|
|
_soup_session_send_async (dh->session, transfer->msg, transfer->cancellable,
|
|
on_request_sent, transfer_task);
|
|
g_array_append_val (dh->active_transfers, transfer_task);
|
|
}
|
|
|
|
/* Idle callback that submits all pending transfers */
|
|
static gboolean
|
|
submit_transfers_cb (DownloadHelper * dh)
|
|
{
|
|
GTask *transfer;
|
|
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
do {
|
|
transfer = g_async_queue_try_pop (dh->transfer_requests);
|
|
if (transfer) {
|
|
submit_transfer (dh, transfer);
|
|
}
|
|
} while (transfer != NULL);
|
|
|
|
/* FIXME: Use a PollFD like GWakeup instead? */
|
|
g_source_destroy (dh->transfer_requests_source);
|
|
g_source_unref (dh->transfer_requests_source);
|
|
dh->transfer_requests_source = NULL;
|
|
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
return G_SOURCE_REMOVE;
|
|
}
|
|
|
|
static gpointer
|
|
dh_transfer_thread_func (gpointer data)
|
|
{
|
|
DownloadHelper *dh = data;
|
|
GST_DEBUG ("DownloadHelper thread starting");
|
|
|
|
g_main_context_push_thread_default (dh->transfer_context);
|
|
g_main_loop_run (dh->loop);
|
|
g_main_context_pop_thread_default (dh->transfer_context);
|
|
|
|
GST_DEBUG ("Exiting DownloadHelper thread");
|
|
return NULL;
|
|
}
|
|
|
|
gboolean
|
|
downloadhelper_start (DownloadHelper * dh)
|
|
{
|
|
g_return_val_if_fail (dh->transfer_thread == NULL, FALSE);
|
|
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
if (!dh->running) {
|
|
|
|
dh->transfer_thread =
|
|
g_thread_try_new ("adaptive-download-task", dh_transfer_thread_func, dh,
|
|
NULL);
|
|
dh->running = (dh->transfer_thread != NULL);
|
|
}
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
return dh->running;
|
|
}
|
|
|
|
void
|
|
downloadhelper_stop (DownloadHelper * dh)
|
|
{
|
|
int i;
|
|
GThread *transfer_thread = NULL;
|
|
|
|
GST_DEBUG ("Stopping DownloadHelper loop");
|
|
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
|
|
dh->running = FALSE;
|
|
|
|
for (i = 0; i < dh->active_transfers->len; i++) {
|
|
GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
g_cancellable_cancel (transfer->cancellable);
|
|
}
|
|
|
|
g_main_loop_quit (dh->loop);
|
|
|
|
transfer_thread = dh->transfer_thread;
|
|
dh->transfer_thread = NULL;
|
|
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
if (transfer_thread != NULL) {
|
|
g_thread_join (transfer_thread);
|
|
}
|
|
|
|
/* The transfer thread has exited at this point - any remaining transfers are unfinished
|
|
* and need cleaning up */
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
|
|
for (i = 0; i < dh->active_transfers->len; i++) {
|
|
GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
download_request_lock (request);
|
|
/* Reset the state to UNSENT, to indicate cancellation, like an XMLHttpRequest does */
|
|
request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
|
|
download_request_unlock (request);
|
|
|
|
transfer->complete = TRUE;
|
|
if (transfer->blocking)
|
|
g_cond_broadcast (&transfer->cond);
|
|
else
|
|
g_task_return_boolean (transfer_task, TRUE);
|
|
}
|
|
|
|
g_array_set_size (dh->active_transfers, 0);
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
}
|
|
|
|
gboolean
|
|
downloadhelper_submit_request (DownloadHelper * dh,
|
|
const gchar * referer, DownloadFlags flags, DownloadRequest * request,
|
|
GError ** err)
|
|
{
|
|
GTask *transfer_task = NULL;
|
|
const gchar *method;
|
|
SoupMessage *msg;
|
|
SoupMessageHeaders *msg_headers;
|
|
gboolean blocking = (flags & DOWNLOAD_FLAG_BLOCKING) != 0;
|
|
|
|
method =
|
|
(flags & DOWNLOAD_FLAG_HEADERS_ONLY) ? SOUP_METHOD_HEAD : SOUP_METHOD_GET;
|
|
|
|
download_request_lock (request);
|
|
if (request->in_use) {
|
|
GST_ERROR ("Request for URI %s reusing active request object",
|
|
request->uri);
|
|
download_request_unlock (request);
|
|
return FALSE;
|
|
}
|
|
|
|
/* Clear the state back to unsent */
|
|
request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
|
|
|
|
msg = _soup_message_new (method, request->uri);
|
|
if (msg == NULL) {
|
|
g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
|
|
"Could not parse download URI %s", request->uri);
|
|
|
|
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
|
|
download_request_unlock (request);
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
/* NOTE: There was a bug where Akamai servers return the
|
|
* wrong result for a range request on small files. To avoid
|
|
* it if the range starts within the first KB of the file, just
|
|
* start at 0 instead */
|
|
if (request->range_start < 1024)
|
|
request->range_start = 0;
|
|
|
|
msg_headers = _soup_message_get_request_headers (msg);
|
|
|
|
if (request->range_start != 0 || request->range_end != -1) {
|
|
_soup_message_headers_set_range (msg_headers, request->range_start,
|
|
request->range_end);
|
|
}
|
|
|
|
download_request_unlock (request);
|
|
|
|
/* If resubmitting a request, clear any stale / unused data */
|
|
download_request_begin_download (request);
|
|
|
|
if ((flags & DOWNLOAD_FLAG_COMPRESS) == 0) {
|
|
_soup_message_disable_feature (msg, _soup_content_decoder_get_type ());
|
|
}
|
|
if (flags & DOWNLOAD_FLAG_FORCE_REFRESH) {
|
|
_soup_message_headers_append (msg_headers, "Cache-Control", "max-age=0");
|
|
}
|
|
|
|
/* Take the lock to protect header strings */
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
|
|
if (referer != NULL) {
|
|
_soup_message_headers_append (msg_headers, "Referer", referer);
|
|
} else if (dh->referer != NULL) {
|
|
_soup_message_headers_append (msg_headers, "Referer", dh->referer);
|
|
}
|
|
|
|
if (dh->user_agent != NULL) {
|
|
_soup_message_headers_append (msg_headers, "User-Agent", dh->user_agent);
|
|
}
|
|
|
|
if (dh->cookies != NULL) {
|
|
gchar **cookie;
|
|
|
|
for (cookie = dh->cookies; *cookie != NULL; cookie++) {
|
|
_soup_message_headers_append (msg_headers, "Cookie", *cookie);
|
|
}
|
|
}
|
|
|
|
transfer_task = transfer_task_new (dh, request, msg, blocking);
|
|
|
|
if (!dh->running) {
|
|
/* The download helper was deactivated just as we went to dispatch this request.
|
|
* Abort and manually wake the request, as it never went in the active_transfer list */
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
download_request_lock (request);
|
|
request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
|
|
request->in_use = FALSE;
|
|
download_request_unlock (request);
|
|
|
|
g_cancellable_cancel (g_task_get_cancellable (transfer_task));
|
|
g_task_return_error_if_cancelled (transfer_task);
|
|
g_object_unref (transfer_task);
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
download_request_lock (request);
|
|
request->in_use = TRUE;
|
|
download_request_unlock (request);
|
|
|
|
g_signal_connect (msg, "restarted", G_CALLBACK (soup_msg_restarted_cb),
|
|
transfer_task);
|
|
|
|
/* Now send the request over to the main loop for actual submission */
|
|
GST_LOG ("Submitting transfer task %p", transfer_task);
|
|
g_async_queue_push (dh->transfer_requests, transfer_task);
|
|
|
|
/* No pending idle source to wake the transfer loop - so create one */
|
|
if (dh->transfer_requests_source == NULL) {
|
|
dh->transfer_requests_source = g_idle_source_new ();
|
|
g_source_set_callback (dh->transfer_requests_source,
|
|
(GSourceFunc) submit_transfers_cb, dh, NULL);
|
|
g_source_attach (dh->transfer_requests_source, dh->transfer_context);
|
|
}
|
|
|
|
if (blocking) {
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
|
|
/* We need an extra ref on the task to make sure it stays alive.
|
|
* We pushed it in the async queue, but didn't unlock yet, so while
|
|
* we gave away our ref, the receiver can't have unreffed it */
|
|
g_object_ref (transfer_task);
|
|
while (!transfer->complete)
|
|
g_cond_wait (&transfer->cond, &dh->transfer_lock);
|
|
g_object_unref (transfer_task);
|
|
}
|
|
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
void
|
|
downloadhelper_cancel_request (DownloadHelper * dh, DownloadRequest * request)
|
|
{
|
|
int i;
|
|
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
|
|
download_request_lock (request);
|
|
if (!request->in_use)
|
|
goto out;
|
|
|
|
GST_DEBUG ("Cancelling request for URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT, request->uri, request->range_start, request->range_end);
|
|
|
|
request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
|
|
|
|
for (i = dh->active_transfers->len - 1; i >= 0; i--) {
|
|
GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
|
|
if (transfer->request == request) {
|
|
GST_DEBUG ("Found transfer %p for request for URI %s range %"
|
|
G_GINT64_FORMAT " %" G_GINT64_FORMAT, transfer, request->uri,
|
|
request->range_start, request->range_end);
|
|
g_cancellable_cancel (transfer->cancellable);
|
|
break;
|
|
}
|
|
}
|
|
|
|
out:
|
|
download_request_unlock (request);
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
}
|
|
|
|
DownloadRequest *
|
|
downloadhelper_fetch_uri_range (DownloadHelper * dh, const gchar * uri,
|
|
const gchar * referer, DownloadFlags flags, gint64 range_start,
|
|
gint64 range_end, GError ** err)
|
|
{
|
|
DownloadRequest *request;
|
|
|
|
g_return_val_if_fail (uri != NULL, NULL);
|
|
|
|
GST_DEBUG ("Fetching URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT,
|
|
uri, range_start, range_end);
|
|
|
|
flags |= DOWNLOAD_FLAG_BLOCKING;
|
|
|
|
request = download_request_new_uri_range (uri, range_start, range_end);
|
|
|
|
if (!downloadhelper_submit_request (dh, referer, flags, request, err)) {
|
|
download_request_unref (request);
|
|
return NULL;
|
|
}
|
|
|
|
return request;
|
|
}
|
|
|
|
DownloadRequest *
|
|
downloadhelper_fetch_uri (DownloadHelper * dh, const gchar * uri,
|
|
const gchar * referer, DownloadFlags flags, GError ** err)
|
|
{
|
|
return downloadhelper_fetch_uri_range (dh, uri, referer, flags, 0, -1, err);
|
|
}
|