mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-02 21:48:55 +00:00
049859c2cb
There is a race condition where transfer has not been submitted yet while the request is cancelled which leads to the transfer state going back to `DOWNLOAD_REQUEST_STATE_OPEN` and the user of the request to get signalled about its completion (and the task actually happening after it was cancelled) leading to assertions and misbehaviours. To ensure that this race can't happen, we start differentiating between the UNSENT and CANCELLED states as in the normal case, when entering `submit_request` the state is UNSENT and at that point we need to know that it is not because the request has been cancelled. In practice this case lead to an assertion in `gst_adaptive_demux2_stream_begin_download_uri` because in a previous call to `gst_adaptive_demux2_stream_stop_default` we cancelled the previous request and setup a new one while it had not been submitted yet and then got a `on_download_complete` callback called from that previous cancelled request and then we tried to do `download_request_set_uri` on a request that was still `in_use`, leading to something like: ``` #0: 0x0000000186655ec8 g_assert (request->in_use == FALSE)assert.c:0 #1: 0x00000001127236b8 libgstadaptivedemux2.dylib`download_request_set_uri(request=0x000060000017cc00, uri="https://XXX/chunk-stream1-00002.webm", range_start=0, range_end=-1) at downloadrequest.c:361 #2: 0x000000011271cee8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_begin_download_uri(stream=0x00000001330f1800, uri="https://XXX/chunk-stream1-00002.webm", start=0, end=-1) at gstadaptivedemux-stream.c:1447 #3: 0x0000000112719898 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment [inlined] gst_adaptive_demux2_stream_download_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:0 #4: 0x00000001127197f8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:1969 #5: 0x000000011271c2a4 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_next_download(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:2112 ``` Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5435>
1029 lines
30 KiB
C
1029 lines
30 KiB
C
/* GStreamer
|
|
* Copyright (C) 2021-2022 Jan Schmidt <jan@centricular.com>
|
|
*
|
|
* downloadhelper.c:
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Library General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Library General Public License for more details.
|
|
*
|
|
* Youshould have received a copy of the GNU Library General Public
|
|
* License along with this library; if not, write to the
|
|
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
* Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
#include "downloadhelper.h"
|
|
#include "../soup/gstsouploader.h"
|
|
|
|
GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
|
|
#define GST_CAT_DEFAULT adaptivedemux2_debug
|
|
|
|
#define CHUNK_BUFFER_SIZE 32768
|
|
|
|
typedef struct DownloadHelperTransfer DownloadHelperTransfer;
|
|
|
|
struct DownloadHelper
|
|
{
|
|
GThread *transfer_thread;
|
|
|
|
gboolean running;
|
|
|
|
GstAdaptiveDemuxClock *clock;
|
|
|
|
GMainContext *transfer_context;
|
|
GMainLoop *loop;
|
|
SoupSession *session;
|
|
|
|
GMutex transfer_lock;
|
|
GArray *active_transfers;
|
|
|
|
GAsyncQueue *transfer_requests;
|
|
GSource *transfer_requests_source;
|
|
|
|
gchar *referer;
|
|
gchar *user_agent;
|
|
gchar **cookies;
|
|
};
|
|
|
|
struct DownloadHelperTransfer
|
|
{
|
|
DownloadHelper *dh;
|
|
|
|
gboolean blocking;
|
|
gboolean complete;
|
|
gboolean progress_pending;
|
|
|
|
GCond cond;
|
|
|
|
GCancellable *cancellable;
|
|
|
|
SoupMessage *msg;
|
|
gboolean request_sent;
|
|
|
|
/* Current read buffer */
|
|
char *read_buffer;
|
|
guint64 read_buffer_size;
|
|
guint64 read_position; /* Start in bytes of the read_buffer */
|
|
|
|
DownloadRequest *request;
|
|
};
|
|
|
|
static void
|
|
free_transfer (DownloadHelperTransfer * transfer)
|
|
{
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
if (request)
|
|
download_request_unref (request);
|
|
|
|
if (transfer->blocking)
|
|
g_cond_clear (&transfer->cond);
|
|
|
|
g_object_unref (transfer->msg);
|
|
g_free (transfer->read_buffer);
|
|
g_free (transfer);
|
|
}
|
|
|
|
static void
|
|
transfer_completion_cb (gpointer src_object, GAsyncResult * res,
|
|
gpointer user_data)
|
|
{
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data ((GTask *) res);
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
if (transfer->blocking)
|
|
return;
|
|
|
|
download_request_lock (request);
|
|
request->in_use = FALSE;
|
|
GST_LOG ("Despatching completion for transfer %p request %p", transfer,
|
|
request);
|
|
download_request_despatch_completion (request);
|
|
download_request_unlock (request);
|
|
}
|
|
|
|
static gboolean
|
|
transfer_report_progress_cb (gpointer task)
|
|
{
|
|
DownloadHelperTransfer *transfer;
|
|
DownloadRequest *request;
|
|
|
|
/* Already completed - late callback */
|
|
if (g_task_get_completed (task))
|
|
return FALSE;
|
|
|
|
transfer = g_task_get_task_data (task);
|
|
request = transfer->request;
|
|
|
|
download_request_lock (request);
|
|
if (request->send_progress) {
|
|
GST_LOG ("Despatching progress for transfer %p request %p", transfer,
|
|
request);
|
|
download_request_despatch_progress (request);
|
|
}
|
|
transfer->progress_pending = FALSE;
|
|
download_request_unlock (request);
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
static GTask *
|
|
transfer_task_new (DownloadHelper * dh, DownloadRequest * request,
|
|
SoupMessage * msg, gboolean blocking)
|
|
{
|
|
GTask *transfer_task = NULL;
|
|
DownloadHelperTransfer *transfer = g_new0 (DownloadHelperTransfer, 1);
|
|
|
|
transfer->blocking = blocking;
|
|
if (transfer->blocking)
|
|
g_cond_init (&transfer->cond);
|
|
|
|
transfer->cancellable = g_cancellable_new ();
|
|
transfer->request = download_request_ref (request);
|
|
|
|
transfer->dh = dh;
|
|
transfer->msg = msg;
|
|
|
|
transfer_task =
|
|
g_task_new (NULL, transfer->cancellable,
|
|
(GAsyncReadyCallback) transfer_completion_cb, NULL);
|
|
g_task_set_task_data (transfer_task, transfer,
|
|
(GDestroyNotify) free_transfer);
|
|
|
|
return transfer_task;
|
|
}
|
|
|
|
static void
|
|
release_transfer_task_by_ref (GTask ** transfer_task)
|
|
{
|
|
g_object_unref (*transfer_task);
|
|
}
|
|
|
|
/* Called with download_request lock held */
|
|
static void
|
|
transfer_task_report_progress (GTask * transfer_task)
|
|
{
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
DownloadRequest *request = transfer->request;
|
|
GSource *idle_source;
|
|
|
|
if (transfer->progress_pending == TRUE || !request->send_progress)
|
|
return;
|
|
|
|
/* There's no progress cb pending and this download wants reports, so
|
|
* attach an idle source */
|
|
transfer->progress_pending = TRUE;
|
|
idle_source = g_idle_source_new ();
|
|
g_task_attach_source (transfer_task, idle_source,
|
|
transfer_report_progress_cb);
|
|
g_source_unref (idle_source);
|
|
}
|
|
|
|
static void
|
|
finish_transfer_task (DownloadHelper * dh, GTask * transfer_task,
|
|
GError * error)
|
|
{
|
|
int i;
|
|
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
for (i = dh->active_transfers->len - 1; i >= 0; i--) {
|
|
if (transfer_task == g_array_index (dh->active_transfers, GTask *, i)) {
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
|
|
transfer->complete = TRUE;
|
|
|
|
if (transfer->blocking)
|
|
g_cond_broadcast (&transfer->cond);
|
|
|
|
if (error != NULL)
|
|
g_task_return_error (transfer_task, error);
|
|
else
|
|
g_task_return_boolean (transfer_task, TRUE);
|
|
|
|
/* This drops the task ref: */
|
|
g_array_remove_index_fast (dh->active_transfers, i);
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
return;
|
|
}
|
|
}
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
GST_WARNING ("Did not find transfer %p in the active transfer list",
|
|
transfer_task);
|
|
}
|
|
|
|
static gboolean
|
|
new_read_buffer (DownloadHelperTransfer * transfer)
|
|
{
|
|
gint buffer_size = CHUNK_BUFFER_SIZE;
|
|
#if 0
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
if (request->range_end != -1) {
|
|
if (request->range_end <= transfer->read_position) {
|
|
transfer->read_buffer = NULL;
|
|
transfer->read_buffer_size = 0;
|
|
return FALSE;
|
|
}
|
|
if (request->range_end - transfer->read_position < buffer_size)
|
|
buffer_size = request->range_end - transfer->read_position + 1;
|
|
}
|
|
#endif
|
|
|
|
transfer->read_buffer = g_new (char, buffer_size);
|
|
transfer->read_buffer_size = buffer_size;
|
|
return TRUE;
|
|
}
|
|
|
|
static void
|
|
on_read_ready (GObject * source, GAsyncResult * result, gpointer user_data)
|
|
{
|
|
GTask *transfer_task = user_data;
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
|
|
DownloadHelper *dh = transfer->dh;
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
GInputStream *in = G_INPUT_STREAM (source);
|
|
GError *error = NULL;
|
|
gsize bytes_read = 0;
|
|
|
|
GstClockTime now = gst_adaptive_demux_clock_get_time (dh->clock);
|
|
|
|
gboolean read_failed =
|
|
g_input_stream_read_all_finish (in, result, &bytes_read, &error);
|
|
|
|
download_request_lock (request);
|
|
|
|
if (error) {
|
|
g_free (transfer->read_buffer);
|
|
transfer->read_buffer = NULL;
|
|
|
|
if (!g_cancellable_is_cancelled (transfer->cancellable)) {
|
|
GST_ERROR ("Failed to read stream: %s", error->message);
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED)
|
|
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
|
|
finish_transfer_task (dh, transfer_task, error);
|
|
} else {
|
|
/* Ignore error from cancelled operation */
|
|
g_error_free (error);
|
|
finish_transfer_task (dh, transfer_task, NULL);
|
|
}
|
|
download_request_unlock (request);
|
|
|
|
return;
|
|
}
|
|
|
|
if (bytes_read > 0) {
|
|
GstBuffer *gst_buffer =
|
|
gst_buffer_new_wrapped (transfer->read_buffer, bytes_read);
|
|
|
|
GST_BUFFER_OFFSET (gst_buffer) = transfer->read_position;
|
|
transfer->read_position += bytes_read;
|
|
transfer->read_buffer = NULL;
|
|
|
|
/* Clip the buffer to within the range */
|
|
if (GST_BUFFER_OFFSET (gst_buffer) < request->range_start) {
|
|
if (transfer->read_position <= request->range_start) {
|
|
GST_DEBUG ("Discarding %" G_GSIZE_FORMAT
|
|
" bytes entirely before requested range",
|
|
gst_buffer_get_size (gst_buffer));
|
|
/* This buffer is completely before the range start, discard it */
|
|
gst_buffer_unref (gst_buffer);
|
|
gst_buffer = NULL;
|
|
} else {
|
|
GST_DEBUG ("Clipping first %" G_GINT64_FORMAT
|
|
" bytes before requested range",
|
|
request->range_start - GST_BUFFER_OFFSET (gst_buffer));
|
|
|
|
/* This buffer is partially within the requested range, clip the beginning */
|
|
gst_buffer_resize (gst_buffer,
|
|
request->range_start - GST_BUFFER_OFFSET (gst_buffer), -1);
|
|
GST_BUFFER_OFFSET (gst_buffer) = request->range_start;
|
|
}
|
|
}
|
|
|
|
if (gst_buffer != NULL) {
|
|
/* Don't override CANCELLED state. Otherwise make sure it is LOADING */
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED)
|
|
request->state = DOWNLOAD_REQUEST_STATE_LOADING;
|
|
|
|
if (request->download_start_time == GST_CLOCK_TIME_NONE) {
|
|
GST_LOG ("Got first data for URI %s", request->uri);
|
|
request->download_start_time = now;
|
|
}
|
|
request->download_newest_data_time = now;
|
|
|
|
GST_LOG ("Adding %u bytes to buffer (request URI %s)",
|
|
(guint) (gst_buffer_get_size (gst_buffer)), request->uri);
|
|
|
|
download_request_add_buffer (request, gst_buffer);
|
|
|
|
transfer_task_report_progress (transfer_task);
|
|
}
|
|
} else if (read_failed) {
|
|
/* The read failed and returned 0 bytes: We're done */
|
|
goto finish_transfer;
|
|
}
|
|
|
|
/* Resubmit the read request to get more */
|
|
if (!new_read_buffer (transfer))
|
|
goto finish_transfer;
|
|
|
|
g_main_context_push_thread_default (dh->transfer_context);
|
|
g_input_stream_read_all_async (in, transfer->read_buffer,
|
|
transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable,
|
|
on_read_ready, transfer_task);
|
|
g_main_context_pop_thread_default (dh->transfer_context);
|
|
|
|
download_request_unlock (request);
|
|
return;
|
|
|
|
finish_transfer:
|
|
if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) {
|
|
SoupStatus status_code = _soup_message_get_status (transfer->msg);
|
|
#ifndef GST_DISABLE_GST_DEBUG
|
|
guint download_ms = (now - request->download_request_time) / GST_MSECOND;
|
|
GST_LOG ("request complete in %u ms. Code %d URI %s range %" G_GINT64_FORMAT
|
|
" %" G_GINT64_FORMAT, download_ms, status_code,
|
|
request->uri, request->range_start, request->range_end);
|
|
#endif
|
|
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED) {
|
|
if (SOUP_STATUS_IS_SUCCESSFUL (status_code)
|
|
|| SOUP_STATUS_IS_REDIRECTION (status_code)) {
|
|
request->state = DOWNLOAD_REQUEST_STATE_COMPLETE;
|
|
} else {
|
|
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
|
|
}
|
|
}
|
|
}
|
|
request->download_end_time = now;
|
|
|
|
g_free (transfer->read_buffer);
|
|
transfer->read_buffer = NULL;
|
|
|
|
download_request_unlock (request);
|
|
|
|
finish_transfer_task (dh, transfer_task, NULL);
|
|
}
|
|
|
|
static void
|
|
http_header_to_structure (const gchar * name, const gchar * value,
|
|
gpointer user_data)
|
|
{
|
|
GstStructure *headers = user_data;
|
|
const GValue *gv;
|
|
|
|
if (!g_utf8_validate (name, -1, NULL) || !g_utf8_validate (value, -1, NULL))
|
|
return;
|
|
|
|
gv = gst_structure_get_value (headers, name);
|
|
if (gv && GST_VALUE_HOLDS_ARRAY (gv)) {
|
|
GValue v = G_VALUE_INIT;
|
|
|
|
g_value_init (&v, G_TYPE_STRING);
|
|
g_value_set_string (&v, value);
|
|
gst_value_array_append_value ((GValue *) gv, &v);
|
|
g_value_unset (&v);
|
|
} else if (gv && G_VALUE_HOLDS_STRING (gv)) {
|
|
GValue arr = G_VALUE_INIT;
|
|
GValue v = G_VALUE_INIT;
|
|
const gchar *old_value = g_value_get_string (gv);
|
|
|
|
g_value_init (&arr, GST_TYPE_ARRAY);
|
|
g_value_init (&v, G_TYPE_STRING);
|
|
g_value_set_string (&v, old_value);
|
|
gst_value_array_append_value (&arr, &v);
|
|
g_value_set_string (&v, value);
|
|
gst_value_array_append_value (&arr, &v);
|
|
|
|
gst_structure_set_value (headers, name, &arr);
|
|
g_value_unset (&v);
|
|
g_value_unset (&arr);
|
|
} else {
|
|
gst_structure_set (headers, name, G_TYPE_STRING, value, NULL);
|
|
}
|
|
}
|
|
|
|
static void
|
|
soup_msg_restarted_cb (SoupMessage * msg, gpointer user_data)
|
|
{
|
|
GTask *transfer_task = user_data;
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
DownloadRequest *request = transfer->request;
|
|
SoupStatus status = _soup_message_get_status (msg);
|
|
|
|
if (SOUP_STATUS_IS_REDIRECTION (status)) {
|
|
char *redirect_uri = gst_soup_message_uri_to_string (msg);
|
|
gboolean redirect_permanent = (status == SOUP_STATUS_MOVED_PERMANENTLY);
|
|
|
|
GST_DEBUG ("%u redirect to \"%s\" (permanent %d)",
|
|
status, redirect_uri, redirect_permanent);
|
|
|
|
download_request_lock (request);
|
|
g_free (request->redirect_uri);
|
|
request->redirect_uri = redirect_uri;
|
|
request->redirect_permanent = redirect_permanent;
|
|
download_request_unlock (request);
|
|
}
|
|
}
|
|
|
|
static GstStructure *
|
|
handle_response_headers (DownloadHelperTransfer * transfer)
|
|
{
|
|
DownloadRequest *request = transfer->request;
|
|
SoupMessage *msg = transfer->msg;
|
|
SoupMessageHeaders *response_headers;
|
|
GstStructure *http_headers, *headers;
|
|
|
|
http_headers = gst_structure_new_empty ("http-headers");
|
|
|
|
#if 0
|
|
if (msg->status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED &&
|
|
src->proxy_id && src->proxy_pw) {
|
|
/* wait for authenticate callback */
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
if (src->redirection_uri)
|
|
gst_structure_set (http_headers, "redirection-uri", G_TYPE_STRING,
|
|
src->redirection_uri, NULL);
|
|
#endif
|
|
|
|
headers = gst_structure_new_empty ("request-headers");
|
|
_soup_message_headers_foreach (_soup_message_get_request_headers (msg),
|
|
http_header_to_structure, headers);
|
|
gst_structure_set (http_headers, "request-headers", GST_TYPE_STRUCTURE,
|
|
headers, NULL);
|
|
gst_structure_free (headers);
|
|
headers = gst_structure_new_empty ("response-headers");
|
|
response_headers = _soup_message_get_response_headers (msg);
|
|
_soup_message_headers_foreach (response_headers, http_header_to_structure,
|
|
headers);
|
|
gst_structure_set (http_headers, "response-headers", GST_TYPE_STRUCTURE,
|
|
headers, NULL);
|
|
gst_structure_free (headers);
|
|
|
|
#if 0
|
|
if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
|
|
/* force an error */
|
|
gst_structure_free (http_headers);
|
|
return gst_soup_http_src_parse_status (msg, src);
|
|
}
|
|
#endif
|
|
|
|
/* Parse Content-Length. */
|
|
if (SOUP_STATUS_IS_SUCCESSFUL (_soup_message_get_status (msg)) &&
|
|
(_soup_message_headers_get_encoding (response_headers) ==
|
|
SOUP_ENCODING_CONTENT_LENGTH)) {
|
|
request->content_length =
|
|
_soup_message_headers_get_content_length (response_headers);
|
|
}
|
|
/* Parse Content-Range in a partial content response to set our initial read_position */
|
|
transfer->read_position = 0;
|
|
if (_soup_message_get_status (msg) == SOUP_STATUS_PARTIAL_CONTENT) {
|
|
goffset start, end;
|
|
if (_soup_message_headers_get_content_range (response_headers, &start,
|
|
&end, NULL)) {
|
|
GST_DEBUG ("Content-Range response %" G_GOFFSET_FORMAT "-%"
|
|
G_GOFFSET_FORMAT, start, end);
|
|
transfer->read_position = start;
|
|
}
|
|
}
|
|
if (transfer->read_position != request->range_start) {
|
|
GST_WARNING ("Server did not respect our range request for range %"
|
|
G_GINT64_FORMAT " to %" G_GINT64_FORMAT " - starting at offset %"
|
|
G_GUINT64_FORMAT, request->range_start, request->range_end,
|
|
transfer->read_position);
|
|
}
|
|
|
|
return http_headers;
|
|
}
|
|
|
|
static void
|
|
on_request_sent (GObject * source, GAsyncResult * result, gpointer user_data)
|
|
{
|
|
GTask *transfer_task = user_data;
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
|
|
DownloadHelper *dh = transfer->dh;
|
|
DownloadRequest *request = transfer->request;
|
|
SoupMessage *msg = transfer->msg;
|
|
GError *error = NULL;
|
|
|
|
GInputStream *in =
|
|
_soup_session_send_finish ((SoupSession *) source, result, &error);
|
|
|
|
download_request_lock (request);
|
|
|
|
if (in == NULL) {
|
|
request->status_code = _soup_message_get_status (msg);
|
|
|
|
if (!g_cancellable_is_cancelled (transfer->cancellable)) {
|
|
GST_LOG ("request errored. Code %d URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT, request->status_code, request->uri,
|
|
request->range_start, request->range_end);
|
|
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED)
|
|
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
|
|
finish_transfer_task (dh, transfer_task, error);
|
|
} else {
|
|
/* Ignore error from cancelled operation */
|
|
g_error_free (error);
|
|
finish_transfer_task (dh, transfer_task, NULL);
|
|
}
|
|
download_request_unlock (request);
|
|
|
|
/* No async callback queued - the transfer is done */
|
|
finish_transfer_task (dh, transfer_task, error);
|
|
return;
|
|
}
|
|
|
|
/* If the state is cancelled don't override it */
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED &&
|
|
request->state != DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED) {
|
|
|
|
request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
|
|
request->status_code = _soup_message_get_status (msg);
|
|
request->headers = handle_response_headers (transfer);
|
|
GST_TRACE ("request URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT " headers: %" GST_PTR_FORMAT,
|
|
request->uri, request->range_start, request->range_end,
|
|
request->headers);
|
|
|
|
if (SOUP_STATUS_IS_SUCCESSFUL (request->status_code)
|
|
|| SOUP_STATUS_IS_REDIRECTION (request->status_code)) {
|
|
request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
|
|
transfer_task_report_progress (transfer_task);
|
|
} else {
|
|
goto finish_transfer_error;
|
|
}
|
|
}
|
|
|
|
if (!new_read_buffer (transfer))
|
|
goto finish_transfer_error;
|
|
|
|
download_request_unlock (request);
|
|
|
|
g_main_context_push_thread_default (dh->transfer_context);
|
|
g_input_stream_read_all_async (in, transfer->read_buffer,
|
|
transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable,
|
|
on_read_ready, transfer_task);
|
|
g_main_context_pop_thread_default (dh->transfer_context);
|
|
|
|
g_object_unref (in);
|
|
return;
|
|
|
|
finish_transfer_error:
|
|
request->download_end_time = gst_adaptive_demux_clock_get_time (dh->clock);
|
|
|
|
if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) {
|
|
GST_LOG ("request complete. Code %d URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT, _soup_message_get_status (msg), request->uri,
|
|
request->range_start, request->range_end);
|
|
|
|
/* If the state is cancelled don't override it */
|
|
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED)
|
|
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
|
|
}
|
|
|
|
g_free (transfer->read_buffer);
|
|
transfer->read_buffer = NULL;
|
|
|
|
download_request_unlock (request);
|
|
finish_transfer_task (dh, transfer_task, NULL);
|
|
g_object_unref (in);
|
|
}
|
|
|
|
DownloadHelper *
|
|
downloadhelper_new (GstAdaptiveDemuxClock * clock)
|
|
{
|
|
DownloadHelper *dh = g_new0 (DownloadHelper, 1);
|
|
|
|
dh->transfer_context = g_main_context_new ();
|
|
dh->loop = g_main_loop_new (dh->transfer_context, FALSE);
|
|
|
|
dh->clock = gst_adaptive_demux_clock_ref (clock);
|
|
|
|
g_mutex_init (&dh->transfer_lock);
|
|
dh->active_transfers = g_array_new (FALSE, FALSE, sizeof (GTask *));
|
|
|
|
g_array_set_clear_func (dh->active_transfers,
|
|
(GDestroyNotify) (release_transfer_task_by_ref));
|
|
|
|
dh->transfer_requests =
|
|
g_async_queue_new_full ((GDestroyNotify) g_object_unref);
|
|
dh->transfer_requests_source = NULL;
|
|
|
|
/* libsoup 3.0 (not 2.74 or 3.1) dispatches using a single source attached
|
|
* when the session is created, so we need to ensure it matches here. */
|
|
g_main_context_push_thread_default (dh->transfer_context);
|
|
|
|
/* Set 10 second timeout. Any longer is likely
|
|
* an attempt to reuse an already closed connection */
|
|
dh->session = _soup_session_new_with_options ("timeout", 10, NULL);
|
|
|
|
g_main_context_pop_thread_default (dh->transfer_context);
|
|
|
|
return dh;
|
|
}
|
|
|
|
void
|
|
downloadhelper_free (DownloadHelper * dh)
|
|
{
|
|
downloadhelper_stop (dh);
|
|
|
|
if (dh->session)
|
|
g_object_unref (dh->session);
|
|
g_main_loop_unref (dh->loop);
|
|
g_main_context_unref (dh->transfer_context);
|
|
|
|
if (dh->clock)
|
|
gst_adaptive_demux_clock_unref (dh->clock);
|
|
|
|
g_array_free (dh->active_transfers, TRUE);
|
|
g_async_queue_unref (dh->transfer_requests);
|
|
|
|
g_free (dh->referer);
|
|
g_free (dh->user_agent);
|
|
g_strfreev (dh->cookies);
|
|
|
|
g_free (dh);
|
|
}
|
|
|
|
void
|
|
downloadhelper_set_referer (DownloadHelper * dh, const gchar * referer)
|
|
{
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
g_free (dh->referer);
|
|
dh->referer = g_strdup (referer);
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
}
|
|
|
|
void
|
|
downloadhelper_set_user_agent (DownloadHelper * dh, const gchar * user_agent)
|
|
{
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
g_free (dh->user_agent);
|
|
dh->user_agent = g_strdup (user_agent);
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
}
|
|
|
|
/* Takes ownership of the strv */
|
|
void
|
|
downloadhelper_set_cookies (DownloadHelper * dh, gchar ** cookies)
|
|
{
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
g_strfreev (dh->cookies);
|
|
dh->cookies = cookies;
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
}
|
|
|
|
/* Called with the transfer lock held */
|
|
static void
|
|
submit_transfer (DownloadHelper * dh, GTask * transfer_task)
|
|
{
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
download_request_lock (request);
|
|
if (request->state == DOWNLOAD_REQUEST_STATE_CANCELLED) {
|
|
download_request_unlock (request);
|
|
|
|
GST_DEBUG ("Don't submit already cancelled transfer");
|
|
return;
|
|
}
|
|
|
|
request->state = DOWNLOAD_REQUEST_STATE_OPEN;
|
|
request->download_request_time =
|
|
gst_adaptive_demux_clock_get_time (dh->clock);
|
|
|
|
GST_LOG ("Submitting request URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT, request->uri, request->range_start, request->range_end);
|
|
|
|
transfer_task_report_progress (transfer_task);
|
|
download_request_unlock (request);
|
|
|
|
_soup_session_send_async (dh->session, transfer->msg, transfer->cancellable,
|
|
on_request_sent, transfer_task);
|
|
g_array_append_val (dh->active_transfers, transfer_task);
|
|
}
|
|
|
|
/* Idle callback that submits all pending transfers */
|
|
static gboolean
|
|
submit_transfers_cb (DownloadHelper * dh)
|
|
{
|
|
GTask *transfer;
|
|
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
do {
|
|
transfer = g_async_queue_try_pop (dh->transfer_requests);
|
|
if (transfer) {
|
|
submit_transfer (dh, transfer);
|
|
}
|
|
} while (transfer != NULL);
|
|
|
|
/* FIXME: Use a PollFD like GWakeup instead? */
|
|
g_source_destroy (dh->transfer_requests_source);
|
|
g_source_unref (dh->transfer_requests_source);
|
|
dh->transfer_requests_source = NULL;
|
|
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
return G_SOURCE_REMOVE;
|
|
}
|
|
|
|
static gpointer
|
|
dh_transfer_thread_func (gpointer data)
|
|
{
|
|
DownloadHelper *dh = data;
|
|
GST_DEBUG ("DownloadHelper thread starting");
|
|
|
|
g_main_context_push_thread_default (dh->transfer_context);
|
|
g_main_loop_run (dh->loop);
|
|
g_main_context_pop_thread_default (dh->transfer_context);
|
|
|
|
GST_DEBUG ("Exiting DownloadHelper thread");
|
|
return NULL;
|
|
}
|
|
|
|
gboolean
|
|
downloadhelper_start (DownloadHelper * dh)
|
|
{
|
|
g_return_val_if_fail (dh->transfer_thread == NULL, FALSE);
|
|
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
if (!dh->running) {
|
|
|
|
dh->transfer_thread =
|
|
g_thread_try_new ("adaptive-download-task", dh_transfer_thread_func, dh,
|
|
NULL);
|
|
dh->running = (dh->transfer_thread != NULL);
|
|
}
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
return dh->running;
|
|
}
|
|
|
|
void
|
|
downloadhelper_stop (DownloadHelper * dh)
|
|
{
|
|
int i;
|
|
GThread *transfer_thread = NULL;
|
|
|
|
GST_DEBUG ("Stopping DownloadHelper loop");
|
|
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
|
|
dh->running = FALSE;
|
|
|
|
for (i = 0; i < dh->active_transfers->len; i++) {
|
|
GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
g_cancellable_cancel (transfer->cancellable);
|
|
}
|
|
|
|
g_main_loop_quit (dh->loop);
|
|
|
|
transfer_thread = dh->transfer_thread;
|
|
dh->transfer_thread = NULL;
|
|
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
if (transfer_thread != NULL) {
|
|
g_thread_join (transfer_thread);
|
|
}
|
|
|
|
/* The transfer thread has exited at this point - any remaining transfers are unfinished
|
|
* and need cleaning up */
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
|
|
for (i = 0; i < dh->active_transfers->len; i++) {
|
|
GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
DownloadRequest *request = transfer->request;
|
|
|
|
download_request_lock (request);
|
|
request->state = DOWNLOAD_REQUEST_STATE_CANCELLED;
|
|
download_request_unlock (request);
|
|
|
|
transfer->complete = TRUE;
|
|
if (transfer->blocking)
|
|
g_cond_broadcast (&transfer->cond);
|
|
|
|
g_task_return_boolean (transfer_task, TRUE);
|
|
}
|
|
|
|
g_array_set_size (dh->active_transfers, 0);
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
}
|
|
|
|
gboolean
|
|
downloadhelper_submit_request (DownloadHelper * dh,
|
|
const gchar * referer, DownloadFlags flags, DownloadRequest * request,
|
|
GError ** err)
|
|
{
|
|
GTask *transfer_task = NULL;
|
|
const gchar *method;
|
|
SoupMessage *msg;
|
|
SoupMessageHeaders *msg_headers;
|
|
gboolean blocking = (flags & DOWNLOAD_FLAG_BLOCKING) != 0;
|
|
|
|
method =
|
|
(flags & DOWNLOAD_FLAG_HEADERS_ONLY) ? SOUP_METHOD_HEAD : SOUP_METHOD_GET;
|
|
|
|
download_request_lock (request);
|
|
if (request->in_use) {
|
|
GST_ERROR ("Request for URI %s reusing active request object",
|
|
request->uri);
|
|
download_request_unlock (request);
|
|
return FALSE;
|
|
}
|
|
|
|
/* Clear the state back to unsent */
|
|
request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
|
|
|
|
msg = _soup_message_new (method, request->uri);
|
|
if (msg == NULL) {
|
|
g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
|
|
"Could not parse download URI %s", request->uri);
|
|
|
|
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
|
|
download_request_unlock (request);
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
/* NOTE: There was a bug where Akamai servers return the
|
|
* wrong result for a range request on small files. To avoid
|
|
* it if the range starts within the first KB of the file, just
|
|
* start at 0 instead */
|
|
if (request->range_start < 1024)
|
|
request->range_start = 0;
|
|
|
|
msg_headers = _soup_message_get_request_headers (msg);
|
|
|
|
if (request->range_start != 0 || request->range_end != -1) {
|
|
_soup_message_headers_set_range (msg_headers, request->range_start,
|
|
request->range_end);
|
|
}
|
|
|
|
download_request_unlock (request);
|
|
|
|
/* If resubmitting a request, clear any stale / unused data */
|
|
download_request_begin_download (request);
|
|
|
|
if ((flags & DOWNLOAD_FLAG_COMPRESS) == 0) {
|
|
_soup_message_disable_feature (msg, _soup_content_decoder_get_type ());
|
|
}
|
|
if (flags & DOWNLOAD_FLAG_FORCE_REFRESH) {
|
|
_soup_message_headers_append (msg_headers, "Cache-Control", "max-age=0");
|
|
}
|
|
|
|
/* Take the lock to protect header strings */
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
|
|
if (referer != NULL) {
|
|
_soup_message_headers_append (msg_headers, "Referer", referer);
|
|
} else if (dh->referer != NULL) {
|
|
_soup_message_headers_append (msg_headers, "Referer", dh->referer);
|
|
}
|
|
|
|
if (dh->user_agent != NULL) {
|
|
_soup_message_headers_append (msg_headers, "User-Agent", dh->user_agent);
|
|
}
|
|
|
|
if (dh->cookies != NULL) {
|
|
gchar **cookie;
|
|
|
|
for (cookie = dh->cookies; *cookie != NULL; cookie++) {
|
|
_soup_message_headers_append (msg_headers, "Cookie", *cookie);
|
|
}
|
|
}
|
|
|
|
transfer_task = transfer_task_new (dh, request, msg, blocking);
|
|
|
|
if (!dh->running) {
|
|
/* The download helper was deactivated just as we went to dispatch this request.
|
|
* Abort and manually wake the request, as it never went in the active_transfer list */
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
download_request_lock (request);
|
|
request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
|
|
request->in_use = FALSE;
|
|
download_request_unlock (request);
|
|
|
|
g_cancellable_cancel (g_task_get_cancellable (transfer_task));
|
|
g_task_return_error_if_cancelled (transfer_task);
|
|
g_object_unref (transfer_task);
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
download_request_lock (request);
|
|
request->in_use = TRUE;
|
|
download_request_unlock (request);
|
|
|
|
g_signal_connect (msg, "restarted", G_CALLBACK (soup_msg_restarted_cb),
|
|
transfer_task);
|
|
|
|
/* Now send the request over to the main loop for actual submission */
|
|
GST_LOG ("Submitting transfer task %p", transfer_task);
|
|
g_async_queue_push (dh->transfer_requests, transfer_task);
|
|
|
|
/* No pending idle source to wake the transfer loop - so create one */
|
|
if (dh->transfer_requests_source == NULL) {
|
|
dh->transfer_requests_source = g_idle_source_new ();
|
|
g_source_set_callback (dh->transfer_requests_source,
|
|
(GSourceFunc) submit_transfers_cb, dh, NULL);
|
|
g_source_attach (dh->transfer_requests_source, dh->transfer_context);
|
|
}
|
|
|
|
if (blocking) {
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
|
|
/* We need an extra ref on the task to make sure it stays alive.
|
|
* We pushed it in the async queue, but didn't unlock yet, so while
|
|
* we gave away our ref, the receiver can't have unreffed it */
|
|
g_object_ref (transfer_task);
|
|
while (!transfer->complete)
|
|
g_cond_wait (&transfer->cond, &dh->transfer_lock);
|
|
g_object_unref (transfer_task);
|
|
}
|
|
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
void
|
|
downloadhelper_cancel_request (DownloadHelper * dh, DownloadRequest * request)
|
|
{
|
|
int i;
|
|
|
|
g_mutex_lock (&dh->transfer_lock);
|
|
|
|
download_request_lock (request);
|
|
if (!request->in_use)
|
|
goto out;
|
|
|
|
GST_DEBUG ("Cancelling request for URI %s range %" G_GINT64_FORMAT " %"
|
|
G_GINT64_FORMAT, request->uri, request->range_start, request->range_end);
|
|
|
|
request->state = DOWNLOAD_REQUEST_STATE_CANCELLED;
|
|
for (i = dh->active_transfers->len - 1; i >= 0; i--) {
|
|
GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
|
|
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
|
|
|
|
if (transfer->request == request) {
|
|
GST_DEBUG ("Found transfer %p for request for URI %s range %"
|
|
G_GINT64_FORMAT " %" G_GINT64_FORMAT, transfer, request->uri,
|
|
request->range_start, request->range_end);
|
|
g_cancellable_cancel (transfer->cancellable);
|
|
break;
|
|
}
|
|
}
|
|
|
|
out:
|
|
download_request_unlock (request);
|
|
g_mutex_unlock (&dh->transfer_lock);
|
|
}
|
|
|
|
DownloadRequest *
|
|
downloadhelper_fetch_uri_range (DownloadHelper * dh, const gchar * uri,
|
|
const gchar * referer, DownloadFlags flags, gint64 range_start,
|
|
gint64 range_end, GError ** err)
|
|
{
|
|
DownloadRequest *request;
|
|
|
|
g_return_val_if_fail (uri != NULL, NULL);
|
|
|
|
GST_DEBUG ("Fetching URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT,
|
|
uri, range_start, range_end);
|
|
|
|
flags |= DOWNLOAD_FLAG_BLOCKING;
|
|
|
|
request = download_request_new_uri_range (uri, range_start, range_end);
|
|
|
|
if (!downloadhelper_submit_request (dh, referer, flags, request, err)) {
|
|
download_request_unref (request);
|
|
return NULL;
|
|
}
|
|
|
|
return request;
|
|
}
|
|
|
|
DownloadRequest *
|
|
downloadhelper_fetch_uri (DownloadHelper * dh, const gchar * uri,
|
|
const gchar * referer, DownloadFlags flags, GError ** err)
|
|
{
|
|
return downloadhelper_fetch_uri_range (dh, uri, referer, flags, 0, -1, err);
|
|
}
|