gstreamer/subprojects/gst-plugins-good/ext/adaptivedemux2/downloadhelper.c

1255 lines
38 KiB
C
Raw Normal View History

/* GStreamer
* Copyright (C) 2021-2022 Jan Schmidt <jan@centricular.com>
*
* downloadhelper.c:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* Youshould have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include "downloadhelper.h"
#include "../soup/gstsouploader.h"
GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
#define GST_CAT_DEFAULT adaptivedemux2_debug
#define CHUNK_BUFFER_SIZE 32768
typedef struct DownloadHelperTransfer DownloadHelperTransfer;
struct DownloadHelper
{
GThread *transfer_thread;
gboolean running;
GstAdaptiveDemuxClock *clock;
GMainContext *transfer_context;
GMainLoop *loop;
SoupSession *session;
GMutex transfer_lock;
GArray *active_transfers;
GAsyncQueue *transfer_requests;
GSource *transfer_requests_source;
gchar *referer;
gchar *user_agent;
GSList *cookies;
};
struct DownloadHelperTransfer
{
DownloadHelper *dh;
gboolean blocking;
gboolean complete;
gboolean progress_pending;
GCond cond;
GCancellable *cancellable;
gboolean is_file_transfer;
union
{
SoupMessage *msg;
GFile *file;
};
gboolean request_sent;
/* Current read buffer */
char *read_buffer;
guint64 read_buffer_size;
guint64 read_position; /* Start in bytes of the read_buffer */
DownloadRequest *request;
};
static void
free_transfer (DownloadHelperTransfer * transfer)
{
DownloadRequest *request = transfer->request;
if (request)
download_request_unref (request);
if (transfer->blocking)
g_cond_clear (&transfer->cond);
g_object_unref (transfer->msg);
g_free (transfer->read_buffer);
g_free (transfer);
}
static void
transfer_completion_cb (gpointer src_object, GAsyncResult * res,
gpointer user_data)
{
DownloadHelperTransfer *transfer = g_task_get_task_data ((GTask *) res);
DownloadRequest *request = transfer->request;
if (transfer->blocking)
return;
download_request_lock (request);
request->in_use = FALSE;
GST_LOG ("Despatching completion for transfer %p request %p", transfer,
request);
download_request_despatch_completion (request);
download_request_unlock (request);
}
static gboolean
transfer_report_progress_cb (gpointer task)
{
DownloadHelperTransfer *transfer;
DownloadRequest *request;
/* Already completed - late callback */
if (g_task_get_completed (task))
return FALSE;
transfer = g_task_get_task_data (task);
request = transfer->request;
download_request_lock (request);
if (request->send_progress) {
GST_LOG ("Despatching progress for transfer %p request %p", transfer,
request);
download_request_despatch_progress (request);
}
transfer->progress_pending = FALSE;
download_request_unlock (request);
return FALSE;
}
static DownloadHelperTransfer *
transfer_new_common (DownloadHelper * dh, DownloadRequest * request,
gboolean blocking)
{
DownloadHelperTransfer *transfer = g_new0 (DownloadHelperTransfer, 1);
transfer->blocking = blocking;
if (transfer->blocking)
g_cond_init (&transfer->cond);
transfer->cancellable = g_cancellable_new ();
transfer->request = download_request_ref (request);
transfer->dh = dh;
return transfer;
}
static GTask *
transfer_task_new_for_transfer (DownloadHelperTransfer * transfer)
{
GTask *transfer_task = NULL;
transfer_task =
g_task_new (NULL, transfer->cancellable,
(GAsyncReadyCallback) transfer_completion_cb, NULL);
g_task_set_task_data (transfer_task, transfer,
(GDestroyNotify) free_transfer);
return transfer_task;
}
static GTask *
transfer_task_new_file (DownloadHelper * dh, DownloadRequest * request,
gboolean blocking)
{
DownloadHelperTransfer *transfer =
transfer_new_common (dh, request, blocking);
transfer->is_file_transfer = TRUE;
transfer->file = g_file_new_for_uri (request->uri);
return transfer_task_new_for_transfer (transfer);
}
static GTask *
transfer_task_new_soup (DownloadHelper * dh, DownloadRequest * request,
SoupMessage * msg, gboolean blocking)
{
DownloadHelperTransfer *transfer =
transfer_new_common (dh, request, blocking);
transfer->msg = msg;
return transfer_task_new_for_transfer (transfer);
}
static void
release_transfer_task_by_ref (GTask ** transfer_task)
{
g_object_unref (*transfer_task);
}
/* Called with download_request lock held */
static void
transfer_task_report_progress (GTask * transfer_task)
{
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
DownloadRequest *request = transfer->request;
GSource *idle_source;
if (transfer->progress_pending == TRUE || !request->send_progress)
return;
/* There's no progress cb pending and this download wants reports, so
* attach an idle source */
transfer->progress_pending = TRUE;
idle_source = g_idle_source_new ();
g_task_attach_source (transfer_task, idle_source,
transfer_report_progress_cb);
g_source_unref (idle_source);
}
static void
finish_transfer_task (DownloadHelper * dh, GTask * transfer_task,
GError * error)
{
int i;
g_mutex_lock (&dh->transfer_lock);
for (i = dh->active_transfers->len - 1; i >= 0; i--) {
if (transfer_task == g_array_index (dh->active_transfers, GTask *, i)) {
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
transfer->complete = TRUE;
if (transfer->blocking)
g_cond_broadcast (&transfer->cond);
if (error != NULL)
g_task_return_error (transfer_task, error);
else
g_task_return_boolean (transfer_task, TRUE);
/* This drops the task ref: */
g_array_remove_index_fast (dh->active_transfers, i);
g_mutex_unlock (&dh->transfer_lock);
return;
}
}
g_mutex_unlock (&dh->transfer_lock);
GST_WARNING ("Did not find transfer %p in the active transfer list",
transfer_task);
}
static gboolean
new_read_buffer (DownloadHelperTransfer * transfer)
{
gint buffer_size = CHUNK_BUFFER_SIZE;
if (transfer->is_file_transfer) {
/* For file reads, handle range request limits here */
DownloadRequest *request = transfer->request;
if (request->range_end != -1) {
if (request->range_end <= transfer->read_position) {
transfer->read_buffer = NULL;
transfer->read_buffer_size = 0;
return FALSE;
}
if (request->range_end - transfer->read_position < buffer_size)
buffer_size = request->range_end - transfer->read_position + 1;
}
}
transfer->read_buffer = g_new (char, buffer_size);
transfer->read_buffer_size = buffer_size;
return TRUE;
}
static void
on_read_ready (GObject * source, GAsyncResult * result, gpointer user_data)
{
GTask *transfer_task = user_data;
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
DownloadHelper *dh = transfer->dh;
DownloadRequest *request = transfer->request;
GInputStream *in = G_INPUT_STREAM (source);
GError *error = NULL;
gsize bytes_read = 0;
GstClockTime now = gst_adaptive_demux_clock_get_time (dh->clock);
gboolean read_failed =
g_input_stream_read_all_finish (in, result, &bytes_read, &error);
download_request_lock (request);
if (error) {
g_free (transfer->read_buffer);
transfer->read_buffer = NULL;
if (!g_cancellable_is_cancelled (transfer->cancellable)) {
GST_ERROR ("Failed to read stream: %s", error->message);
adaptivedemux2: Do not submit_transfer when cancelled There is a race condition where transfer has not been submitted yet while the request is cancelled which leads to the transfer state going back to `DOWNLOAD_REQUEST_STATE_OPEN` and the user of the request to get signalled about its completion (and the task actually happening after it was cancelled) leading to assertions and misbehaviours. To ensure that this race can't happen, we start differentiating between the UNSENT and CANCELLED states as in the normal case, when entering `submit_request` the state is UNSENT and at that point we need to know that it is not because the request has been cancelled. In practice this case lead to an assertion in `gst_adaptive_demux2_stream_begin_download_uri` because in a previous call to `gst_adaptive_demux2_stream_stop_default` we cancelled the previous request and setup a new one while it had not been submitted yet and then got a `on_download_complete` callback called from that previous cancelled request and then we tried to do `download_request_set_uri` on a request that was still `in_use`, leading to something like: ``` #0: 0x0000000186655ec8 g_assert (request->in_use == FALSE)assert.c:0 #1: 0x00000001127236b8 libgstadaptivedemux2.dylib`download_request_set_uri(request=0x000060000017cc00, uri="https://XXX/chunk-stream1-00002.webm", range_start=0, range_end=-1) at downloadrequest.c:361 #2: 0x000000011271cee8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_begin_download_uri(stream=0x00000001330f1800, uri="https://XXX/chunk-stream1-00002.webm", start=0, end=-1) at gstadaptivedemux-stream.c:1447 #3: 0x0000000112719898 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment [inlined] gst_adaptive_demux2_stream_download_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:0 #4: 0x00000001127197f8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:1969 #5: 0x000000011271c2a4 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_next_download(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:2112 ``` Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5435>
2023-10-04 14:09:37 +00:00
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED)
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
finish_transfer_task (dh, transfer_task, error);
} else {
/* Ignore error from cancelled operation */
g_error_free (error);
finish_transfer_task (dh, transfer_task, NULL);
}
download_request_unlock (request);
return;
}
if (bytes_read > 0) {
GstBuffer *gst_buffer =
gst_buffer_new_wrapped (transfer->read_buffer, bytes_read);
GST_BUFFER_OFFSET (gst_buffer) = transfer->read_position;
transfer->read_position += bytes_read;
transfer->read_buffer = NULL;
/* Clip the buffer to within the range */
if (GST_BUFFER_OFFSET (gst_buffer) < request->range_start) {
if (transfer->read_position <= request->range_start) {
GST_DEBUG ("Discarding %" G_GSIZE_FORMAT
" bytes entirely before requested range",
gst_buffer_get_size (gst_buffer));
/* This buffer is completely before the range start, discard it */
gst_buffer_unref (gst_buffer);
gst_buffer = NULL;
} else {
GST_DEBUG ("Clipping first %" G_GINT64_FORMAT
" bytes before requested range",
request->range_start - GST_BUFFER_OFFSET (gst_buffer));
/* This buffer is partially within the requested range, clip the beginning */
gst_buffer_resize (gst_buffer,
request->range_start - GST_BUFFER_OFFSET (gst_buffer), -1);
GST_BUFFER_OFFSET (gst_buffer) = request->range_start;
}
}
if (gst_buffer != NULL) {
adaptivedemux2: Do not submit_transfer when cancelled There is a race condition where transfer has not been submitted yet while the request is cancelled which leads to the transfer state going back to `DOWNLOAD_REQUEST_STATE_OPEN` and the user of the request to get signalled about its completion (and the task actually happening after it was cancelled) leading to assertions and misbehaviours. To ensure that this race can't happen, we start differentiating between the UNSENT and CANCELLED states as in the normal case, when entering `submit_request` the state is UNSENT and at that point we need to know that it is not because the request has been cancelled. In practice this case lead to an assertion in `gst_adaptive_demux2_stream_begin_download_uri` because in a previous call to `gst_adaptive_demux2_stream_stop_default` we cancelled the previous request and setup a new one while it had not been submitted yet and then got a `on_download_complete` callback called from that previous cancelled request and then we tried to do `download_request_set_uri` on a request that was still `in_use`, leading to something like: ``` #0: 0x0000000186655ec8 g_assert (request->in_use == FALSE)assert.c:0 #1: 0x00000001127236b8 libgstadaptivedemux2.dylib`download_request_set_uri(request=0x000060000017cc00, uri="https://XXX/chunk-stream1-00002.webm", range_start=0, range_end=-1) at downloadrequest.c:361 #2: 0x000000011271cee8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_begin_download_uri(stream=0x00000001330f1800, uri="https://XXX/chunk-stream1-00002.webm", start=0, end=-1) at gstadaptivedemux-stream.c:1447 #3: 0x0000000112719898 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment [inlined] gst_adaptive_demux2_stream_download_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:0 #4: 0x00000001127197f8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:1969 #5: 0x000000011271c2a4 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_next_download(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:2112 ``` Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5435>
2023-10-04 14:09:37 +00:00
/* Don't override CANCELLED state. Otherwise make sure it is LOADING */
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED)
request->state = DOWNLOAD_REQUEST_STATE_LOADING;
if (request->download_start_time == GST_CLOCK_TIME_NONE) {
GST_LOG ("Got first data for URI %s", request->uri);
request->download_start_time = now;
}
request->download_newest_data_time = now;
GST_LOG ("Adding %u bytes to buffer (request URI %s)",
(guint) (gst_buffer_get_size (gst_buffer)), request->uri);
download_request_add_buffer (request, gst_buffer);
transfer_task_report_progress (transfer_task);
}
} else if (read_failed) {
/* The read failed and returned 0 bytes: We're done */
goto finish_transfer;
}
/* Resubmit the read request to get more */
if (!new_read_buffer (transfer))
goto finish_transfer;
g_main_context_push_thread_default (dh->transfer_context);
g_input_stream_read_all_async (in, transfer->read_buffer,
transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable,
on_read_ready, transfer_task);
g_main_context_pop_thread_default (dh->transfer_context);
download_request_unlock (request);
return;
finish_transfer:
if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) {
if (!transfer->is_file_transfer) {
request->status_code = _soup_message_get_status (transfer->msg);
}
#ifndef GST_DISABLE_GST_DEBUG
guint download_ms = (now - request->download_request_time) / GST_MSECOND;
GST_LOG ("request complete in %u ms. Code %d URI %s range %" G_GINT64_FORMAT
" %" G_GINT64_FORMAT, download_ms, request->status_code,
request->uri, request->range_start, request->range_end);
#endif
adaptivedemux2: Do not submit_transfer when cancelled There is a race condition where transfer has not been submitted yet while the request is cancelled which leads to the transfer state going back to `DOWNLOAD_REQUEST_STATE_OPEN` and the user of the request to get signalled about its completion (and the task actually happening after it was cancelled) leading to assertions and misbehaviours. To ensure that this race can't happen, we start differentiating between the UNSENT and CANCELLED states as in the normal case, when entering `submit_request` the state is UNSENT and at that point we need to know that it is not because the request has been cancelled. In practice this case lead to an assertion in `gst_adaptive_demux2_stream_begin_download_uri` because in a previous call to `gst_adaptive_demux2_stream_stop_default` we cancelled the previous request and setup a new one while it had not been submitted yet and then got a `on_download_complete` callback called from that previous cancelled request and then we tried to do `download_request_set_uri` on a request that was still `in_use`, leading to something like: ``` #0: 0x0000000186655ec8 g_assert (request->in_use == FALSE)assert.c:0 #1: 0x00000001127236b8 libgstadaptivedemux2.dylib`download_request_set_uri(request=0x000060000017cc00, uri="https://XXX/chunk-stream1-00002.webm", range_start=0, range_end=-1) at downloadrequest.c:361 #2: 0x000000011271cee8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_begin_download_uri(stream=0x00000001330f1800, uri="https://XXX/chunk-stream1-00002.webm", start=0, end=-1) at gstadaptivedemux-stream.c:1447 #3: 0x0000000112719898 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment [inlined] gst_adaptive_demux2_stream_download_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:0 #4: 0x00000001127197f8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:1969 #5: 0x000000011271c2a4 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_next_download(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:2112 ``` Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5435>
2023-10-04 14:09:37 +00:00
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED) {
if (SOUP_STATUS_IS_SUCCESSFUL (request->status_code)
|| SOUP_STATUS_IS_REDIRECTION (request->status_code)) {
request->state = DOWNLOAD_REQUEST_STATE_COMPLETE;
adaptivedemux2: Do not submit_transfer when cancelled There is a race condition where transfer has not been submitted yet while the request is cancelled which leads to the transfer state going back to `DOWNLOAD_REQUEST_STATE_OPEN` and the user of the request to get signalled about its completion (and the task actually happening after it was cancelled) leading to assertions and misbehaviours. To ensure that this race can't happen, we start differentiating between the UNSENT and CANCELLED states as in the normal case, when entering `submit_request` the state is UNSENT and at that point we need to know that it is not because the request has been cancelled. In practice this case lead to an assertion in `gst_adaptive_demux2_stream_begin_download_uri` because in a previous call to `gst_adaptive_demux2_stream_stop_default` we cancelled the previous request and setup a new one while it had not been submitted yet and then got a `on_download_complete` callback called from that previous cancelled request and then we tried to do `download_request_set_uri` on a request that was still `in_use`, leading to something like: ``` #0: 0x0000000186655ec8 g_assert (request->in_use == FALSE)assert.c:0 #1: 0x00000001127236b8 libgstadaptivedemux2.dylib`download_request_set_uri(request=0x000060000017cc00, uri="https://XXX/chunk-stream1-00002.webm", range_start=0, range_end=-1) at downloadrequest.c:361 #2: 0x000000011271cee8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_begin_download_uri(stream=0x00000001330f1800, uri="https://XXX/chunk-stream1-00002.webm", start=0, end=-1) at gstadaptivedemux-stream.c:1447 #3: 0x0000000112719898 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment [inlined] gst_adaptive_demux2_stream_download_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:0 #4: 0x00000001127197f8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:1969 #5: 0x000000011271c2a4 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_next_download(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:2112 ``` Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5435>
2023-10-04 14:09:37 +00:00
} else {
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
adaptivedemux2: Do not submit_transfer when cancelled There is a race condition where transfer has not been submitted yet while the request is cancelled which leads to the transfer state going back to `DOWNLOAD_REQUEST_STATE_OPEN` and the user of the request to get signalled about its completion (and the task actually happening after it was cancelled) leading to assertions and misbehaviours. To ensure that this race can't happen, we start differentiating between the UNSENT and CANCELLED states as in the normal case, when entering `submit_request` the state is UNSENT and at that point we need to know that it is not because the request has been cancelled. In practice this case lead to an assertion in `gst_adaptive_demux2_stream_begin_download_uri` because in a previous call to `gst_adaptive_demux2_stream_stop_default` we cancelled the previous request and setup a new one while it had not been submitted yet and then got a `on_download_complete` callback called from that previous cancelled request and then we tried to do `download_request_set_uri` on a request that was still `in_use`, leading to something like: ``` #0: 0x0000000186655ec8 g_assert (request->in_use == FALSE)assert.c:0 #1: 0x00000001127236b8 libgstadaptivedemux2.dylib`download_request_set_uri(request=0x000060000017cc00, uri="https://XXX/chunk-stream1-00002.webm", range_start=0, range_end=-1) at downloadrequest.c:361 #2: 0x000000011271cee8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_begin_download_uri(stream=0x00000001330f1800, uri="https://XXX/chunk-stream1-00002.webm", start=0, end=-1) at gstadaptivedemux-stream.c:1447 #3: 0x0000000112719898 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment [inlined] gst_adaptive_demux2_stream_download_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:0 #4: 0x00000001127197f8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:1969 #5: 0x000000011271c2a4 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_next_download(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:2112 ``` Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5435>
2023-10-04 14:09:37 +00:00
}
}
}
request->download_end_time = now;
g_free (transfer->read_buffer);
transfer->read_buffer = NULL;
download_request_unlock (request);
finish_transfer_task (dh, transfer_task, NULL);
}
static void
http_header_to_structure (const gchar * name, const gchar * value,
gpointer user_data)
{
GstStructure *headers = user_data;
const GValue *gv;
if (!g_utf8_validate (name, -1, NULL) || !g_utf8_validate (value, -1, NULL))
return;
gv = gst_structure_get_value (headers, name);
if (gv && GST_VALUE_HOLDS_ARRAY (gv)) {
GValue v = G_VALUE_INIT;
g_value_init (&v, G_TYPE_STRING);
g_value_set_string (&v, value);
gst_value_array_append_value ((GValue *) gv, &v);
g_value_unset (&v);
} else if (gv && G_VALUE_HOLDS_STRING (gv)) {
GValue arr = G_VALUE_INIT;
GValue v = G_VALUE_INIT;
const gchar *old_value = g_value_get_string (gv);
g_value_init (&arr, GST_TYPE_ARRAY);
g_value_init (&v, G_TYPE_STRING);
g_value_set_string (&v, old_value);
gst_value_array_append_value (&arr, &v);
g_value_set_string (&v, value);
gst_value_array_append_value (&arr, &v);
gst_structure_set_value (headers, name, &arr);
g_value_unset (&v);
g_value_unset (&arr);
} else {
gst_structure_set (headers, name, G_TYPE_STRING, value, NULL);
}
}
static void
soup_msg_restarted_cb (SoupMessage * msg, gpointer user_data)
{
GTask *transfer_task = user_data;
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
DownloadRequest *request = transfer->request;
SoupStatus status = _soup_message_get_status (msg);
if (SOUP_STATUS_IS_REDIRECTION (status)) {
char *redirect_uri = gst_soup_message_uri_to_string (msg);
gboolean redirect_permanent = (status == SOUP_STATUS_MOVED_PERMANENTLY);
GST_DEBUG ("%u redirect to \"%s\" (permanent %d)",
status, redirect_uri, redirect_permanent);
download_request_lock (request);
g_free (request->redirect_uri);
request->redirect_uri = redirect_uri;
request->redirect_permanent = redirect_permanent;
download_request_unlock (request);
}
}
static GstStructure *
handle_response_headers (DownloadHelperTransfer * transfer)
{
DownloadRequest *request = transfer->request;
SoupMessage *msg = transfer->msg;
SoupMessageHeaders *response_headers;
GstStructure *http_headers, *headers;
http_headers = gst_structure_new_empty ("http-headers");
#if 0
if (msg->status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED &&
src->proxy_id && src->proxy_pw) {
/* wait for authenticate callback */
return GST_FLOW_OK;
}
if (src->redirection_uri)
gst_structure_set (http_headers, "redirection-uri", G_TYPE_STRING,
src->redirection_uri, NULL);
#endif
headers = gst_structure_new_empty ("request-headers");
_soup_message_headers_foreach (_soup_message_get_request_headers (msg),
http_header_to_structure, headers);
gst_structure_set (http_headers, "request-headers", GST_TYPE_STRUCTURE,
headers, NULL);
gst_structure_free (headers);
headers = gst_structure_new_empty ("response-headers");
response_headers = _soup_message_get_response_headers (msg);
_soup_message_headers_foreach (response_headers, http_header_to_structure,
headers);
gst_structure_set (http_headers, "response-headers", GST_TYPE_STRUCTURE,
headers, NULL);
gst_structure_free (headers);
#if 0
if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
/* force an error */
gst_structure_free (http_headers);
return gst_soup_http_src_parse_status (msg, src);
}
#endif
/* Parse Content-Length. */
if (SOUP_STATUS_IS_SUCCESSFUL (_soup_message_get_status (msg)) &&
(_soup_message_headers_get_encoding (response_headers) ==
SOUP_ENCODING_CONTENT_LENGTH)) {
request->content_length =
_soup_message_headers_get_content_length (response_headers);
}
/* Parse Content-Range in a partial content response to set our initial read_position */
transfer->read_position = 0;
if (_soup_message_get_status (msg) == SOUP_STATUS_PARTIAL_CONTENT) {
goffset start, end;
if (_soup_message_headers_get_content_range (response_headers, &start,
&end, NULL)) {
GST_DEBUG ("Content-Range response %" G_GOFFSET_FORMAT "-%"
G_GOFFSET_FORMAT, start, end);
transfer->read_position = start;
}
}
if (transfer->read_position != request->range_start) {
GST_WARNING ("Server did not respect our range request for range %"
G_GINT64_FORMAT " to %" G_GINT64_FORMAT " - starting at offset %"
G_GUINT64_FORMAT, request->range_start, request->range_end,
transfer->read_position);
}
return http_headers;
}
static void
on_file_ready (GObject * source, GAsyncResult * result, gpointer user_data)
{
GTask *transfer_task = user_data;
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
DownloadHelper *dh = transfer->dh;
DownloadRequest *request = transfer->request;
GError *error = NULL;
GFileInputStream *in = g_file_read_finish (transfer->file, result, &error);
download_request_lock (request);
if (in == NULL) {
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND)) {
request->status_code = SOUP_STATUS_NOT_FOUND;
} else if (g_error_matches (error, G_IO_ERROR,
G_IO_ERROR_PERMISSION_DENIED)) {
request->status_code = SOUP_STATUS_FORBIDDEN;
} else {
request->status_code = SOUP_STATUS_INTERNAL_SERVER_ERROR;
}
if (!g_cancellable_is_cancelled (transfer->cancellable)) {
GST_LOG ("request errored. Code %d URI %s range %" G_GINT64_FORMAT " %"
G_GINT64_FORMAT, request->status_code, request->uri,
request->range_start, request->range_end);
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED)
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
finish_transfer_task (dh, transfer_task, error);
} else {
/* Ignore error from cancelled operation */
g_error_free (error);
finish_transfer_task (dh, transfer_task, NULL);
}
download_request_unlock (request);
/* No async callback queued - the transfer is done */
finish_transfer_task (dh, transfer_task, error);
return;
}
/* If the state is cancelled don't override it */
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED &&
request->state != DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED) {
request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
request->status_code = SOUP_STATUS_OK;
// FIXME: request->headers = handle_response_headers (transfer);
GST_TRACE ("request URI %s range %" G_GINT64_FORMAT " %"
G_GINT64_FORMAT " headers: %" GST_PTR_FORMAT,
request->uri, request->range_start, request->range_end,
request->headers);
if (SOUP_STATUS_IS_SUCCESSFUL (request->status_code)
|| SOUP_STATUS_IS_REDIRECTION (request->status_code)) {
request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
transfer_task_report_progress (transfer_task);
} else {
goto finish_transfer_error;
}
}
if (!new_read_buffer (transfer))
goto finish_transfer_error;
/* Respect any range request */
if (request->range_start != 0) {
if (!g_seekable_seek (G_SEEKABLE (in), request->range_start, G_SEEK_SET,
transfer->cancellable, &error)) {
goto finish_transfer_error;
}
transfer->read_position = request->range_start;
}
download_request_unlock (request);
g_main_context_push_thread_default (dh->transfer_context);
g_input_stream_read_all_async (G_INPUT_STREAM (in), transfer->read_buffer,
transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable,
on_read_ready, transfer_task);
g_main_context_pop_thread_default (dh->transfer_context);
g_object_unref (in);
return;
finish_transfer_error:
request->download_end_time = gst_adaptive_demux_clock_get_time (dh->clock);
if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) {
GST_LOG ("request complete. Code %d URI %s range %" G_GINT64_FORMAT " %"
G_GINT64_FORMAT, request->status_code, request->uri,
request->range_start, request->range_end);
/* If the state is cancelled don't override it */
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED)
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
}
g_free (transfer->read_buffer);
transfer->read_buffer = NULL;
download_request_unlock (request);
finish_transfer_task (dh, transfer_task, NULL);
g_object_unref (in);
}
static void
on_request_sent (GObject * source, GAsyncResult * result, gpointer user_data)
{
GTask *transfer_task = user_data;
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
DownloadHelper *dh = transfer->dh;
DownloadRequest *request = transfer->request;
SoupMessage *msg = transfer->msg;
GError *error = NULL;
GInputStream *in =
_soup_session_send_finish ((SoupSession *) source, result, &error);
download_request_lock (request);
if (in == NULL) {
request->status_code = _soup_message_get_status (msg);
if (!g_cancellable_is_cancelled (transfer->cancellable)) {
GST_LOG ("request errored. Code %d URI %s range %" G_GINT64_FORMAT " %"
G_GINT64_FORMAT, request->status_code, request->uri,
request->range_start, request->range_end);
adaptivedemux2: Do not submit_transfer when cancelled There is a race condition where transfer has not been submitted yet while the request is cancelled which leads to the transfer state going back to `DOWNLOAD_REQUEST_STATE_OPEN` and the user of the request to get signalled about its completion (and the task actually happening after it was cancelled) leading to assertions and misbehaviours. To ensure that this race can't happen, we start differentiating between the UNSENT and CANCELLED states as in the normal case, when entering `submit_request` the state is UNSENT and at that point we need to know that it is not because the request has been cancelled. In practice this case lead to an assertion in `gst_adaptive_demux2_stream_begin_download_uri` because in a previous call to `gst_adaptive_demux2_stream_stop_default` we cancelled the previous request and setup a new one while it had not been submitted yet and then got a `on_download_complete` callback called from that previous cancelled request and then we tried to do `download_request_set_uri` on a request that was still `in_use`, leading to something like: ``` #0: 0x0000000186655ec8 g_assert (request->in_use == FALSE)assert.c:0 #1: 0x00000001127236b8 libgstadaptivedemux2.dylib`download_request_set_uri(request=0x000060000017cc00, uri="https://XXX/chunk-stream1-00002.webm", range_start=0, range_end=-1) at downloadrequest.c:361 #2: 0x000000011271cee8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_begin_download_uri(stream=0x00000001330f1800, uri="https://XXX/chunk-stream1-00002.webm", start=0, end=-1) at gstadaptivedemux-stream.c:1447 #3: 0x0000000112719898 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment [inlined] gst_adaptive_demux2_stream_download_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:0 #4: 0x00000001127197f8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:1969 #5: 0x000000011271c2a4 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_next_download(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:2112 ``` Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5435>
2023-10-04 14:09:37 +00:00
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED)
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
finish_transfer_task (dh, transfer_task, error);
} else {
/* Ignore error from cancelled operation */
g_error_free (error);
finish_transfer_task (dh, transfer_task, NULL);
}
download_request_unlock (request);
/* No async callback queued - the transfer is done */
finish_transfer_task (dh, transfer_task, error);
return;
}
adaptivedemux2: Do not submit_transfer when cancelled There is a race condition where transfer has not been submitted yet while the request is cancelled which leads to the transfer state going back to `DOWNLOAD_REQUEST_STATE_OPEN` and the user of the request to get signalled about its completion (and the task actually happening after it was cancelled) leading to assertions and misbehaviours. To ensure that this race can't happen, we start differentiating between the UNSENT and CANCELLED states as in the normal case, when entering `submit_request` the state is UNSENT and at that point we need to know that it is not because the request has been cancelled. In practice this case lead to an assertion in `gst_adaptive_demux2_stream_begin_download_uri` because in a previous call to `gst_adaptive_demux2_stream_stop_default` we cancelled the previous request and setup a new one while it had not been submitted yet and then got a `on_download_complete` callback called from that previous cancelled request and then we tried to do `download_request_set_uri` on a request that was still `in_use`, leading to something like: ``` #0: 0x0000000186655ec8 g_assert (request->in_use == FALSE)assert.c:0 #1: 0x00000001127236b8 libgstadaptivedemux2.dylib`download_request_set_uri(request=0x000060000017cc00, uri="https://XXX/chunk-stream1-00002.webm", range_start=0, range_end=-1) at downloadrequest.c:361 #2: 0x000000011271cee8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_begin_download_uri(stream=0x00000001330f1800, uri="https://XXX/chunk-stream1-00002.webm", start=0, end=-1) at gstadaptivedemux-stream.c:1447 #3: 0x0000000112719898 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment [inlined] gst_adaptive_demux2_stream_download_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:0 #4: 0x00000001127197f8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:1969 #5: 0x000000011271c2a4 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_next_download(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:2112 ``` Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5435>
2023-10-04 14:09:37 +00:00
/* If the state is cancelled don't override it */
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED &&
request->state != DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED) {
request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
request->status_code = _soup_message_get_status (msg);
request->headers = handle_response_headers (transfer);
GST_TRACE ("request URI %s range %" G_GINT64_FORMAT " %"
G_GINT64_FORMAT " headers: %" GST_PTR_FORMAT,
request->uri, request->range_start, request->range_end,
request->headers);
if (SOUP_STATUS_IS_SUCCESSFUL (request->status_code)
|| SOUP_STATUS_IS_REDIRECTION (request->status_code)) {
request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
transfer_task_report_progress (transfer_task);
} else {
goto finish_transfer_error;
}
}
if (!new_read_buffer (transfer))
goto finish_transfer_error;
download_request_unlock (request);
g_main_context_push_thread_default (dh->transfer_context);
g_input_stream_read_all_async (in, transfer->read_buffer,
transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable,
on_read_ready, transfer_task);
g_main_context_pop_thread_default (dh->transfer_context);
g_object_unref (in);
return;
finish_transfer_error:
request->download_end_time = gst_adaptive_demux_clock_get_time (dh->clock);
if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) {
GST_LOG ("request complete. Code %d URI %s range %" G_GINT64_FORMAT " %"
G_GINT64_FORMAT, _soup_message_get_status (msg), request->uri,
request->range_start, request->range_end);
adaptivedemux2: Do not submit_transfer when cancelled There is a race condition where transfer has not been submitted yet while the request is cancelled which leads to the transfer state going back to `DOWNLOAD_REQUEST_STATE_OPEN` and the user of the request to get signalled about its completion (and the task actually happening after it was cancelled) leading to assertions and misbehaviours. To ensure that this race can't happen, we start differentiating between the UNSENT and CANCELLED states as in the normal case, when entering `submit_request` the state is UNSENT and at that point we need to know that it is not because the request has been cancelled. In practice this case lead to an assertion in `gst_adaptive_demux2_stream_begin_download_uri` because in a previous call to `gst_adaptive_demux2_stream_stop_default` we cancelled the previous request and setup a new one while it had not been submitted yet and then got a `on_download_complete` callback called from that previous cancelled request and then we tried to do `download_request_set_uri` on a request that was still `in_use`, leading to something like: ``` #0: 0x0000000186655ec8 g_assert (request->in_use == FALSE)assert.c:0 #1: 0x00000001127236b8 libgstadaptivedemux2.dylib`download_request_set_uri(request=0x000060000017cc00, uri="https://XXX/chunk-stream1-00002.webm", range_start=0, range_end=-1) at downloadrequest.c:361 #2: 0x000000011271cee8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_begin_download_uri(stream=0x00000001330f1800, uri="https://XXX/chunk-stream1-00002.webm", start=0, end=-1) at gstadaptivedemux-stream.c:1447 #3: 0x0000000112719898 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment [inlined] gst_adaptive_demux2_stream_download_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:0 #4: 0x00000001127197f8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:1969 #5: 0x000000011271c2a4 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_next_download(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:2112 ``` Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5435>
2023-10-04 14:09:37 +00:00
/* If the state is cancelled don't override it */
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED)
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
}
g_free (transfer->read_buffer);
transfer->read_buffer = NULL;
download_request_unlock (request);
finish_transfer_task (dh, transfer_task, NULL);
g_object_unref (in);
}
static inline gchar
gst_soup_util_log_make_level_tag (SoupLoggerLogLevel level)
{
gchar c;
if (G_UNLIKELY ((gint) level > 9))
return '?';
switch (level) {
case SOUP_LOGGER_LOG_MINIMAL:
c = 'M';
break;
case SOUP_LOGGER_LOG_HEADERS:
c = 'H';
break;
case SOUP_LOGGER_LOG_BODY:
c = 'B';
break;
default:
/* Unknown level. If this is hit libsoup likely added a new
* log level to SoupLoggerLogLevel and it should be added
* as a case */
c = level + '0';
break;
}
return c;
}
static void
gst_soup_util_log_printer_cb (SoupLogger G_GNUC_UNUSED * logger,
SoupLoggerLogLevel level, char direction, const char *data,
gpointer user_data)
{
gchar c;
c = gst_soup_util_log_make_level_tag (level);
GST_TRACE ("HTTP_SESSION(%c): %c %s", c, direction, data);
}
DownloadHelper *
downloadhelper_new (GstAdaptiveDemuxClock * clock)
{
DownloadHelper *dh = g_new0 (DownloadHelper, 1);
dh->transfer_context = g_main_context_new ();
dh->loop = g_main_loop_new (dh->transfer_context, FALSE);
dh->clock = gst_adaptive_demux_clock_ref (clock);
g_mutex_init (&dh->transfer_lock);
dh->active_transfers = g_array_new (FALSE, FALSE, sizeof (GTask *));
g_array_set_clear_func (dh->active_transfers,
(GDestroyNotify) (release_transfer_task_by_ref));
dh->transfer_requests =
g_async_queue_new_full ((GDestroyNotify) g_object_unref);
dh->transfer_requests_source = NULL;
/* libsoup 3.0 (not 2.74 or 3.1) dispatches using a single source attached
* when the session is created, so we need to ensure it matches here. */
g_main_context_push_thread_default (dh->transfer_context);
/* Set 10 second timeout. Any longer is likely
* an attempt to reuse an already closed connection */
dh->session = _soup_session_new_with_options ("timeout", 10, NULL);
/* Setup soup header debugging if we are at GST_LEVEL_TRACE */
if (gst_debug_category_get_threshold (GST_CAT_DEFAULT) >= GST_LEVEL_TRACE) {
/* Create a new logger and set body_size_limit to -1 (no limit) */
SoupLogger *logger = _soup_logger_new (SOUP_LOGGER_LOG_HEADERS);
_soup_logger_set_printer (logger, gst_soup_util_log_printer_cb, NULL, NULL);
/* Attach logger to session */
_soup_session_add_feature (dh->session, (SoupSessionFeature *) logger);
g_object_unref (logger);
}
g_main_context_pop_thread_default (dh->transfer_context);
return dh;
}
void
downloadhelper_free (DownloadHelper * dh)
{
downloadhelper_stop (dh);
if (dh->session)
g_object_unref (dh->session);
g_main_loop_unref (dh->loop);
g_main_context_unref (dh->transfer_context);
if (dh->clock)
gst_adaptive_demux_clock_unref (dh->clock);
g_array_free (dh->active_transfers, TRUE);
g_async_queue_unref (dh->transfer_requests);
g_free (dh->referer);
g_free (dh->user_agent);
_soup_cookies_free (dh->cookies);
g_free (dh);
}
void
downloadhelper_set_referer (DownloadHelper * dh, const gchar * referer)
{
g_mutex_lock (&dh->transfer_lock);
g_free (dh->referer);
dh->referer = g_strdup (referer);
g_mutex_unlock (&dh->transfer_lock);
}
void
downloadhelper_set_user_agent (DownloadHelper * dh, const gchar * user_agent)
{
g_mutex_lock (&dh->transfer_lock);
g_free (dh->user_agent);
dh->user_agent = g_strdup (user_agent);
g_mutex_unlock (&dh->transfer_lock);
}
/* Takes ownership of the strv */
void
downloadhelper_set_cookies (DownloadHelper * dh, gchar ** cookies)
{
guint i;
g_mutex_lock (&dh->transfer_lock);
_soup_cookies_free (dh->cookies);
dh->cookies = NULL;
for (i = 0; cookies[i]; i++) {
SoupCookie *cookie = _soup_cookie_parse (cookies[i]);
if (cookie == NULL) {
GST_WARNING ("Couldn't parse cookie, ignoring: %s", cookies[i]);
continue;
}
dh->cookies = g_slist_append (dh->cookies, cookie);
}
g_mutex_unlock (&dh->transfer_lock);
g_strfreev (cookies);
}
/* Called with the transfer lock held */
static void
submit_transfer (DownloadHelper * dh, GTask * transfer_task)
{
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
DownloadRequest *request = transfer->request;
download_request_lock (request);
adaptivedemux2: Do not submit_transfer when cancelled There is a race condition where transfer has not been submitted yet while the request is cancelled which leads to the transfer state going back to `DOWNLOAD_REQUEST_STATE_OPEN` and the user of the request to get signalled about its completion (and the task actually happening after it was cancelled) leading to assertions and misbehaviours. To ensure that this race can't happen, we start differentiating between the UNSENT and CANCELLED states as in the normal case, when entering `submit_request` the state is UNSENT and at that point we need to know that it is not because the request has been cancelled. In practice this case lead to an assertion in `gst_adaptive_demux2_stream_begin_download_uri` because in a previous call to `gst_adaptive_demux2_stream_stop_default` we cancelled the previous request and setup a new one while it had not been submitted yet and then got a `on_download_complete` callback called from that previous cancelled request and then we tried to do `download_request_set_uri` on a request that was still `in_use`, leading to something like: ``` #0: 0x0000000186655ec8 g_assert (request->in_use == FALSE)assert.c:0 #1: 0x00000001127236b8 libgstadaptivedemux2.dylib`download_request_set_uri(request=0x000060000017cc00, uri="https://XXX/chunk-stream1-00002.webm", range_start=0, range_end=-1) at downloadrequest.c:361 #2: 0x000000011271cee8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_begin_download_uri(stream=0x00000001330f1800, uri="https://XXX/chunk-stream1-00002.webm", start=0, end=-1) at gstadaptivedemux-stream.c:1447 #3: 0x0000000112719898 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment [inlined] gst_adaptive_demux2_stream_download_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:0 #4: 0x00000001127197f8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:1969 #5: 0x000000011271c2a4 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_next_download(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:2112 ``` Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5435>
2023-10-04 14:09:37 +00:00
if (request->state == DOWNLOAD_REQUEST_STATE_CANCELLED) {
download_request_unlock (request);
GST_DEBUG ("Don't submit already cancelled transfer");
return;
}
request->state = DOWNLOAD_REQUEST_STATE_OPEN;
request->download_request_time =
gst_adaptive_demux_clock_get_time (dh->clock);
GST_LOG ("Submitting request URI %s range %" G_GINT64_FORMAT " %"
G_GINT64_FORMAT, request->uri, request->range_start, request->range_end);
transfer_task_report_progress (transfer_task);
download_request_unlock (request);
if (transfer->is_file_transfer) {
g_file_read_async (transfer->file, G_PRIORITY_DEFAULT,
transfer->cancellable, on_file_ready, transfer_task);
} else {
_soup_session_send_async (dh->session, transfer->msg, transfer->cancellable,
on_request_sent, transfer_task);
}
g_array_append_val (dh->active_transfers, transfer_task);
}
/* Idle callback that submits all pending transfers */
static gboolean
submit_transfers_cb (DownloadHelper * dh)
{
GTask *transfer;
g_mutex_lock (&dh->transfer_lock);
do {
transfer = g_async_queue_try_pop (dh->transfer_requests);
if (transfer) {
submit_transfer (dh, transfer);
}
} while (transfer != NULL);
/* FIXME: Use a PollFD like GWakeup instead? */
g_source_destroy (dh->transfer_requests_source);
g_source_unref (dh->transfer_requests_source);
dh->transfer_requests_source = NULL;
g_mutex_unlock (&dh->transfer_lock);
return G_SOURCE_REMOVE;
}
static gpointer
dh_transfer_thread_func (gpointer data)
{
DownloadHelper *dh = data;
GST_DEBUG ("DownloadHelper thread starting");
g_main_context_push_thread_default (dh->transfer_context);
g_main_loop_run (dh->loop);
g_main_context_pop_thread_default (dh->transfer_context);
GST_DEBUG ("Exiting DownloadHelper thread");
return NULL;
}
gboolean
downloadhelper_start (DownloadHelper * dh)
{
g_return_val_if_fail (dh->transfer_thread == NULL, FALSE);
g_mutex_lock (&dh->transfer_lock);
if (!dh->running) {
dh->transfer_thread =
g_thread_try_new ("adaptive-download-task", dh_transfer_thread_func, dh,
NULL);
dh->running = (dh->transfer_thread != NULL);
}
g_mutex_unlock (&dh->transfer_lock);
return dh->running;
}
void
downloadhelper_stop (DownloadHelper * dh)
{
int i;
GThread *transfer_thread = NULL;
GST_DEBUG ("Stopping DownloadHelper loop");
g_mutex_lock (&dh->transfer_lock);
dh->running = FALSE;
for (i = 0; i < dh->active_transfers->len; i++) {
GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
g_cancellable_cancel (transfer->cancellable);
}
g_main_loop_quit (dh->loop);
transfer_thread = dh->transfer_thread;
dh->transfer_thread = NULL;
g_mutex_unlock (&dh->transfer_lock);
if (transfer_thread != NULL) {
g_thread_join (transfer_thread);
}
/* The transfer thread has exited at this point - any remaining transfers are unfinished
* and need cleaning up */
g_mutex_lock (&dh->transfer_lock);
for (i = 0; i < dh->active_transfers->len; i++) {
GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
DownloadRequest *request = transfer->request;
download_request_lock (request);
adaptivedemux2: Do not submit_transfer when cancelled There is a race condition where transfer has not been submitted yet while the request is cancelled which leads to the transfer state going back to `DOWNLOAD_REQUEST_STATE_OPEN` and the user of the request to get signalled about its completion (and the task actually happening after it was cancelled) leading to assertions and misbehaviours. To ensure that this race can't happen, we start differentiating between the UNSENT and CANCELLED states as in the normal case, when entering `submit_request` the state is UNSENT and at that point we need to know that it is not because the request has been cancelled. In practice this case lead to an assertion in `gst_adaptive_demux2_stream_begin_download_uri` because in a previous call to `gst_adaptive_demux2_stream_stop_default` we cancelled the previous request and setup a new one while it had not been submitted yet and then got a `on_download_complete` callback called from that previous cancelled request and then we tried to do `download_request_set_uri` on a request that was still `in_use`, leading to something like: ``` #0: 0x0000000186655ec8 g_assert (request->in_use == FALSE)assert.c:0 #1: 0x00000001127236b8 libgstadaptivedemux2.dylib`download_request_set_uri(request=0x000060000017cc00, uri="https://XXX/chunk-stream1-00002.webm", range_start=0, range_end=-1) at downloadrequest.c:361 #2: 0x000000011271cee8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_begin_download_uri(stream=0x00000001330f1800, uri="https://XXX/chunk-stream1-00002.webm", start=0, end=-1) at gstadaptivedemux-stream.c:1447 #3: 0x0000000112719898 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment [inlined] gst_adaptive_demux2_stream_download_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:0 #4: 0x00000001127197f8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:1969 #5: 0x000000011271c2a4 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_next_download(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:2112 ``` Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5435>
2023-10-04 14:09:37 +00:00
request->state = DOWNLOAD_REQUEST_STATE_CANCELLED;
download_request_unlock (request);
transfer->complete = TRUE;
if (transfer->blocking)
g_cond_broadcast (&transfer->cond);
g_task_return_boolean (transfer_task, TRUE);
}
g_array_set_size (dh->active_transfers, 0);
g_mutex_unlock (&dh->transfer_lock);
}
gboolean
downloadhelper_submit_request (DownloadHelper * dh,
const gchar * referer, DownloadFlags flags, DownloadRequest * request,
GError ** err)
{
GTask *transfer_task = NULL;
gboolean blocking = (flags & DOWNLOAD_FLAG_BLOCKING) != 0;
download_request_lock (request);
if (request->in_use) {
GST_ERROR ("Request for URI %s reusing active request object",
request->uri);
download_request_unlock (request);
return FALSE;
}
/* Clear the state back to unsent */
request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
if (g_str_has_prefix (request->uri, "file://")) {
if (flags & DOWNLOAD_FLAG_HEADERS_ONLY) {
/* FIXME: Implement fake header only requests for file URIs? */
GST_ERROR ("file:// URIs do not support header-only requests");
download_request_unlock (request);
return FALSE;
}
download_request_unlock (request);
/* If resubmitting a request, clear any stale / unused data */
download_request_begin_download (request);
transfer_task = transfer_task_new_file (dh, request, blocking);
g_mutex_lock (&dh->transfer_lock);
} else {
const gchar *method =
(flags & DOWNLOAD_FLAG_HEADERS_ONLY) ? SOUP_METHOD_HEAD :
SOUP_METHOD_GET;
SoupMessage *msg = _soup_message_new (method, request->uri);
if (msg == NULL) {
g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
"Could not parse download URI %s", request->uri);
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
download_request_unlock (request);
return FALSE;
}
/* NOTE: There was a bug where Akamai servers return the
* wrong result for a range request on small files. To avoid
* it if the range starts within the first KB of the file, just
* start at 0 instead */
if (request->range_start < 1024)
request->range_start = 0;
SoupMessageHeaders *msg_headers = _soup_message_get_request_headers (msg);
if (request->range_start != 0 || request->range_end != -1) {
_soup_message_headers_set_range (msg_headers, request->range_start,
request->range_end);
}
download_request_unlock (request);
/* If resubmitting a request, clear any stale / unused data */
download_request_begin_download (request);
if ((flags & DOWNLOAD_FLAG_COMPRESS) == 0) {
_soup_message_disable_feature (msg, _soup_content_decoder_get_type ());
}
if (flags & DOWNLOAD_FLAG_FORCE_REFRESH) {
_soup_message_headers_append (msg_headers, "Cache-Control", "max-age=0");
}
/* Take the lock to protect header strings */
g_mutex_lock (&dh->transfer_lock);
if (referer != NULL) {
_soup_message_headers_append (msg_headers, "Referer", referer);
} else if (dh->referer != NULL) {
_soup_message_headers_append (msg_headers, "Referer", dh->referer);
}
if (dh->user_agent != NULL) {
_soup_message_headers_append (msg_headers, "User-Agent", dh->user_agent);
}
if (dh->cookies != NULL) {
_soup_cookies_to_request (dh->cookies, msg);
}
transfer_task = transfer_task_new_soup (dh, request, msg, blocking);
g_signal_connect (msg, "restarted", G_CALLBACK (soup_msg_restarted_cb),
transfer_task);
}
if (!dh->running) {
/* The download helper was deactivated just as we went to dispatch this request.
* Abort and manually wake the request, as it never went in the active_transfer list */
g_mutex_unlock (&dh->transfer_lock);
download_request_lock (request);
request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
request->in_use = FALSE;
download_request_unlock (request);
g_cancellable_cancel (g_task_get_cancellable (transfer_task));
g_task_return_error_if_cancelled (transfer_task);
g_object_unref (transfer_task);
return FALSE;
}
download_request_lock (request);
request->in_use = TRUE;
download_request_unlock (request);
/* Now send the request over to the main loop for actual submission */
GST_LOG ("Submitting transfer task %p", transfer_task);
g_async_queue_push (dh->transfer_requests, transfer_task);
/* No pending idle source to wake the transfer loop - so create one */
if (dh->transfer_requests_source == NULL) {
dh->transfer_requests_source = g_idle_source_new ();
g_source_set_callback (dh->transfer_requests_source,
(GSourceFunc) submit_transfers_cb, dh, NULL);
g_source_attach (dh->transfer_requests_source, dh->transfer_context);
}
if (blocking) {
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
/* We need an extra ref on the task to make sure it stays alive.
* We pushed it in the async queue, but didn't unlock yet, so while
* we gave away our ref, the receiver can't have unreffed it */
g_object_ref (transfer_task);
while (!transfer->complete)
g_cond_wait (&transfer->cond, &dh->transfer_lock);
g_object_unref (transfer_task);
}
g_mutex_unlock (&dh->transfer_lock);
return TRUE;
}
void
downloadhelper_cancel_request (DownloadHelper * dh, DownloadRequest * request)
{
int i;
g_mutex_lock (&dh->transfer_lock);
download_request_lock (request);
if (!request->in_use)
goto out;
GST_DEBUG ("Cancelling request for URI %s range %" G_GINT64_FORMAT " %"
G_GINT64_FORMAT, request->uri, request->range_start, request->range_end);
adaptivedemux2: Do not submit_transfer when cancelled There is a race condition where transfer has not been submitted yet while the request is cancelled which leads to the transfer state going back to `DOWNLOAD_REQUEST_STATE_OPEN` and the user of the request to get signalled about its completion (and the task actually happening after it was cancelled) leading to assertions and misbehaviours. To ensure that this race can't happen, we start differentiating between the UNSENT and CANCELLED states as in the normal case, when entering `submit_request` the state is UNSENT and at that point we need to know that it is not because the request has been cancelled. In practice this case lead to an assertion in `gst_adaptive_demux2_stream_begin_download_uri` because in a previous call to `gst_adaptive_demux2_stream_stop_default` we cancelled the previous request and setup a new one while it had not been submitted yet and then got a `on_download_complete` callback called from that previous cancelled request and then we tried to do `download_request_set_uri` on a request that was still `in_use`, leading to something like: ``` #0: 0x0000000186655ec8 g_assert (request->in_use == FALSE)assert.c:0 #1: 0x00000001127236b8 libgstadaptivedemux2.dylib`download_request_set_uri(request=0x000060000017cc00, uri="https://XXX/chunk-stream1-00002.webm", range_start=0, range_end=-1) at downloadrequest.c:361 #2: 0x000000011271cee8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_begin_download_uri(stream=0x00000001330f1800, uri="https://XXX/chunk-stream1-00002.webm", start=0, end=-1) at gstadaptivedemux-stream.c:1447 #3: 0x0000000112719898 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment [inlined] gst_adaptive_demux2_stream_download_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:0 #4: 0x00000001127197f8 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_load_a_fragment(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:1969 #5: 0x000000011271c2a4 libgstadaptivedemux2.dylib`gst_adaptive_demux2_stream_next_download(stream=0x00000001330f1800) at gstadaptivedemux-stream.c:2112 ``` Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5435>
2023-10-04 14:09:37 +00:00
request->state = DOWNLOAD_REQUEST_STATE_CANCELLED;
for (i = dh->active_transfers->len - 1; i >= 0; i--) {
GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
if (transfer->request == request) {
GST_DEBUG ("Found transfer %p for request for URI %s range %"
G_GINT64_FORMAT " %" G_GINT64_FORMAT, transfer, request->uri,
request->range_start, request->range_end);
g_cancellable_cancel (transfer->cancellable);
break;
}
}
out:
download_request_unlock (request);
g_mutex_unlock (&dh->transfer_lock);
}
DownloadRequest *
downloadhelper_fetch_uri_range (DownloadHelper * dh, const gchar * uri,
const gchar * referer, DownloadFlags flags, gint64 range_start,
gint64 range_end, GError ** err)
{
DownloadRequest *request;
g_return_val_if_fail (uri != NULL, NULL);
GST_DEBUG ("Fetching URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT,
uri, range_start, range_end);
flags |= DOWNLOAD_FLAG_BLOCKING;
request = download_request_new_uri_range (uri, range_start, range_end);
if (!downloadhelper_submit_request (dh, referer, flags, request, err)) {
download_request_unref (request);
return NULL;
}
return request;
}
DownloadRequest *
downloadhelper_fetch_uri (DownloadHelper * dh, const gchar * uri,
const gchar * referer, DownloadFlags flags, GError ** err)
{
return downloadhelper_fetch_uri_range (dh, uri, referer, flags, 0, -1, err);
}