/* GStreamer * Copyright (C) 2021-2022 Jan Schmidt * * downloadhelper.c: * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * Youshould have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ #include "downloadhelper.h" #include "../soup/gstsouploader.h" GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug); #define GST_CAT_DEFAULT adaptivedemux2_debug #define CHUNK_BUFFER_SIZE 32768 typedef struct DownloadHelperTransfer DownloadHelperTransfer; struct DownloadHelper { GThread *transfer_thread; gboolean running; GstAdaptiveDemuxClock *clock; GMainContext *transfer_context; GMainLoop *loop; SoupSession *session; GMutex transfer_lock; GArray *active_transfers; GAsyncQueue *transfer_requests; GSource *transfer_requests_source; gchar *referer; gchar *user_agent; GSList *cookies; }; struct DownloadHelperTransfer { DownloadHelper *dh; gboolean blocking; gboolean complete; gboolean progress_pending; GCond cond; GCancellable *cancellable; gboolean is_file_transfer; union { SoupMessage *msg; GFile *file; }; gboolean request_sent; /* Current read buffer */ char *read_buffer; guint64 read_buffer_size; guint64 read_position; /* Start in bytes of the read_buffer */ DownloadRequest *request; }; static void free_transfer (DownloadHelperTransfer * transfer) { DownloadRequest *request = transfer->request; if (request) download_request_unref (request); if (transfer->blocking) g_cond_clear (&transfer->cond); g_object_unref (transfer->msg); g_free (transfer->read_buffer); g_free (transfer); } static void transfer_completion_cb (gpointer src_object, GAsyncResult * res, gpointer user_data) { DownloadHelperTransfer *transfer = g_task_get_task_data ((GTask *) res); DownloadRequest *request = transfer->request; if (transfer->blocking) return; download_request_lock (request); request->in_use = FALSE; GST_LOG ("Despatching completion for transfer %p request %p", transfer, request); download_request_despatch_completion (request); download_request_unlock (request); } static gboolean transfer_report_progress_cb (gpointer task) { DownloadHelperTransfer *transfer; DownloadRequest *request; /* Already completed - late callback */ if (g_task_get_completed (task)) return FALSE; transfer = g_task_get_task_data (task); request = transfer->request; download_request_lock (request); if (request->send_progress) { GST_LOG ("Despatching progress for transfer %p request %p", transfer, request); download_request_despatch_progress (request); } transfer->progress_pending = FALSE; download_request_unlock (request); return FALSE; } static DownloadHelperTransfer * transfer_new_common (DownloadHelper * dh, DownloadRequest * request, gboolean blocking) { DownloadHelperTransfer *transfer = g_new0 (DownloadHelperTransfer, 1); transfer->blocking = blocking; if (transfer->blocking) g_cond_init (&transfer->cond); transfer->cancellable = g_cancellable_new (); transfer->request = download_request_ref (request); transfer->dh = dh; return transfer; } static GTask * transfer_task_new_for_transfer (DownloadHelperTransfer * transfer) { GTask *transfer_task = NULL; transfer_task = g_task_new (NULL, transfer->cancellable, (GAsyncReadyCallback) transfer_completion_cb, NULL); g_task_set_task_data (transfer_task, transfer, (GDestroyNotify) free_transfer); return transfer_task; } static GTask * transfer_task_new_file (DownloadHelper * dh, DownloadRequest * request, gboolean blocking) { DownloadHelperTransfer *transfer = transfer_new_common (dh, request, blocking); transfer->is_file_transfer = TRUE; transfer->file = g_file_new_for_uri (request->uri); return transfer_task_new_for_transfer (transfer); } static GTask * transfer_task_new_soup (DownloadHelper * dh, DownloadRequest * request, SoupMessage * msg, gboolean blocking) { DownloadHelperTransfer *transfer = transfer_new_common (dh, request, blocking); transfer->msg = msg; return transfer_task_new_for_transfer (transfer); } static void release_transfer_task_by_ref (GTask ** transfer_task) { g_object_unref (*transfer_task); } /* Called with download_request lock held */ static void transfer_task_report_progress (GTask * transfer_task) { DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task); DownloadRequest *request = transfer->request; GSource *idle_source; if (transfer->progress_pending == TRUE || !request->send_progress) return; /* There's no progress cb pending and this download wants reports, so * attach an idle source */ transfer->progress_pending = TRUE; idle_source = g_idle_source_new (); g_task_attach_source (transfer_task, idle_source, transfer_report_progress_cb); g_source_unref (idle_source); } static void finish_transfer_task (DownloadHelper * dh, GTask * transfer_task, GError * error) { int i; g_mutex_lock (&dh->transfer_lock); for (i = dh->active_transfers->len - 1; i >= 0; i--) { if (transfer_task == g_array_index (dh->active_transfers, GTask *, i)) { DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task); transfer->complete = TRUE; if (transfer->blocking) g_cond_broadcast (&transfer->cond); if (error != NULL) g_task_return_error (transfer_task, error); else g_task_return_boolean (transfer_task, TRUE); /* This drops the task ref: */ g_array_remove_index_fast (dh->active_transfers, i); g_mutex_unlock (&dh->transfer_lock); return; } } g_mutex_unlock (&dh->transfer_lock); GST_WARNING ("Did not find transfer %p in the active transfer list", transfer_task); } static gboolean new_read_buffer (DownloadHelperTransfer * transfer) { gint buffer_size = CHUNK_BUFFER_SIZE; if (transfer->is_file_transfer) { /* For file reads, handle range request limits here */ DownloadRequest *request = transfer->request; if (request->range_end != -1) { if (request->range_end <= transfer->read_position) { transfer->read_buffer = NULL; transfer->read_buffer_size = 0; return FALSE; } if (request->range_end - transfer->read_position < buffer_size) buffer_size = request->range_end - transfer->read_position + 1; } } transfer->read_buffer = g_new (char, buffer_size); transfer->read_buffer_size = buffer_size; return TRUE; } static void on_read_ready (GObject * source, GAsyncResult * result, gpointer user_data) { GTask *transfer_task = user_data; DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task); DownloadHelper *dh = transfer->dh; DownloadRequest *request = transfer->request; GInputStream *in = G_INPUT_STREAM (source); GError *error = NULL; gsize bytes_read = 0; GstClockTime now = gst_adaptive_demux_clock_get_time (dh->clock); gboolean read_failed = g_input_stream_read_all_finish (in, result, &bytes_read, &error); download_request_lock (request); if (error) { g_free (transfer->read_buffer); transfer->read_buffer = NULL; if (!g_cancellable_is_cancelled (transfer->cancellable)) { GST_ERROR ("Failed to read stream: %s", error->message); if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED) request->state = DOWNLOAD_REQUEST_STATE_ERROR; finish_transfer_task (dh, transfer_task, error); } else { /* Ignore error from cancelled operation */ g_error_free (error); finish_transfer_task (dh, transfer_task, NULL); } download_request_unlock (request); return; } if (bytes_read > 0) { GstBuffer *gst_buffer = gst_buffer_new_wrapped (transfer->read_buffer, bytes_read); GST_BUFFER_OFFSET (gst_buffer) = transfer->read_position; transfer->read_position += bytes_read; transfer->read_buffer = NULL; /* Clip the buffer to within the range */ if (GST_BUFFER_OFFSET (gst_buffer) < request->range_start) { if (transfer->read_position <= request->range_start) { GST_DEBUG ("Discarding %" G_GSIZE_FORMAT " bytes entirely before requested range", gst_buffer_get_size (gst_buffer)); /* This buffer is completely before the range start, discard it */ gst_buffer_unref (gst_buffer); gst_buffer = NULL; } else { GST_DEBUG ("Clipping first %" G_GINT64_FORMAT " bytes before requested range", request->range_start - GST_BUFFER_OFFSET (gst_buffer)); /* This buffer is partially within the requested range, clip the beginning */ gst_buffer_resize (gst_buffer, request->range_start - GST_BUFFER_OFFSET (gst_buffer), -1); GST_BUFFER_OFFSET (gst_buffer) = request->range_start; } } if (gst_buffer != NULL) { /* Don't override CANCELLED state. Otherwise make sure it is LOADING */ if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED) request->state = DOWNLOAD_REQUEST_STATE_LOADING; if (request->download_start_time == GST_CLOCK_TIME_NONE) { GST_LOG ("Got first data for URI %s", request->uri); request->download_start_time = now; } request->download_newest_data_time = now; GST_LOG ("Adding %u bytes to buffer (request URI %s)", (guint) (gst_buffer_get_size (gst_buffer)), request->uri); download_request_add_buffer (request, gst_buffer); transfer_task_report_progress (transfer_task); } } else if (read_failed) { /* The read failed and returned 0 bytes: We're done */ goto finish_transfer; } /* Resubmit the read request to get more */ if (!new_read_buffer (transfer)) goto finish_transfer; g_main_context_push_thread_default (dh->transfer_context); g_input_stream_read_all_async (in, transfer->read_buffer, transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable, on_read_ready, transfer_task); g_main_context_pop_thread_default (dh->transfer_context); download_request_unlock (request); return; finish_transfer: if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) { if (!transfer->is_file_transfer) { request->status_code = _soup_message_get_status (transfer->msg); } #ifndef GST_DISABLE_GST_DEBUG guint download_ms = (now - request->download_request_time) / GST_MSECOND; GST_LOG ("request complete in %u ms. Code %d URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT, download_ms, request->status_code, request->uri, request->range_start, request->range_end); #endif if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED) { if (SOUP_STATUS_IS_SUCCESSFUL (request->status_code) || SOUP_STATUS_IS_REDIRECTION (request->status_code)) { request->state = DOWNLOAD_REQUEST_STATE_COMPLETE; } else { request->state = DOWNLOAD_REQUEST_STATE_ERROR; } } } request->download_end_time = now; g_free (transfer->read_buffer); transfer->read_buffer = NULL; download_request_unlock (request); finish_transfer_task (dh, transfer_task, NULL); } static void http_header_to_structure (const gchar * name, const gchar * value, gpointer user_data) { GstStructure *headers = user_data; const GValue *gv; if (!g_utf8_validate (name, -1, NULL) || !g_utf8_validate (value, -1, NULL)) return; gv = gst_structure_get_value (headers, name); if (gv && GST_VALUE_HOLDS_ARRAY (gv)) { GValue v = G_VALUE_INIT; g_value_init (&v, G_TYPE_STRING); g_value_set_string (&v, value); gst_value_array_append_value ((GValue *) gv, &v); g_value_unset (&v); } else if (gv && G_VALUE_HOLDS_STRING (gv)) { GValue arr = G_VALUE_INIT; GValue v = G_VALUE_INIT; const gchar *old_value = g_value_get_string (gv); g_value_init (&arr, GST_TYPE_ARRAY); g_value_init (&v, G_TYPE_STRING); g_value_set_string (&v, old_value); gst_value_array_append_value (&arr, &v); g_value_set_string (&v, value); gst_value_array_append_value (&arr, &v); gst_structure_set_value (headers, name, &arr); g_value_unset (&v); g_value_unset (&arr); } else { gst_structure_set (headers, name, G_TYPE_STRING, value, NULL); } } static void soup_msg_restarted_cb (SoupMessage * msg, gpointer user_data) { GTask *transfer_task = user_data; DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task); DownloadRequest *request = transfer->request; SoupStatus status = _soup_message_get_status (msg); if (SOUP_STATUS_IS_REDIRECTION (status)) { char *redirect_uri = gst_soup_message_uri_to_string (msg); gboolean redirect_permanent = (status == SOUP_STATUS_MOVED_PERMANENTLY); GST_DEBUG ("%u redirect to \"%s\" (permanent %d)", status, redirect_uri, redirect_permanent); download_request_lock (request); g_free (request->redirect_uri); request->redirect_uri = redirect_uri; request->redirect_permanent = redirect_permanent; download_request_unlock (request); } } static GstStructure * handle_response_headers (DownloadHelperTransfer * transfer) { DownloadRequest *request = transfer->request; SoupMessage *msg = transfer->msg; SoupMessageHeaders *response_headers; GstStructure *http_headers, *headers; http_headers = gst_structure_new_empty ("http-headers"); #if 0 if (msg->status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED && src->proxy_id && src->proxy_pw) { /* wait for authenticate callback */ return GST_FLOW_OK; } if (src->redirection_uri) gst_structure_set (http_headers, "redirection-uri", G_TYPE_STRING, src->redirection_uri, NULL); #endif headers = gst_structure_new_empty ("request-headers"); _soup_message_headers_foreach (_soup_message_get_request_headers (msg), http_header_to_structure, headers); gst_structure_set (http_headers, "request-headers", GST_TYPE_STRUCTURE, headers, NULL); gst_structure_free (headers); headers = gst_structure_new_empty ("response-headers"); response_headers = _soup_message_get_response_headers (msg); _soup_message_headers_foreach (response_headers, http_header_to_structure, headers); gst_structure_set (http_headers, "response-headers", GST_TYPE_STRUCTURE, headers, NULL); gst_structure_free (headers); #if 0 if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) { /* force an error */ gst_structure_free (http_headers); return gst_soup_http_src_parse_status (msg, src); } #endif /* Parse Content-Length. */ if (SOUP_STATUS_IS_SUCCESSFUL (_soup_message_get_status (msg)) && (_soup_message_headers_get_encoding (response_headers) == SOUP_ENCODING_CONTENT_LENGTH)) { request->content_length = _soup_message_headers_get_content_length (response_headers); } /* Parse Content-Range in a partial content response to set our initial read_position */ transfer->read_position = 0; if (_soup_message_get_status (msg) == SOUP_STATUS_PARTIAL_CONTENT) { goffset start, end; if (_soup_message_headers_get_content_range (response_headers, &start, &end, NULL)) { GST_DEBUG ("Content-Range response %" G_GOFFSET_FORMAT "-%" G_GOFFSET_FORMAT, start, end); transfer->read_position = start; } } if (transfer->read_position != request->range_start) { GST_WARNING ("Server did not respect our range request for range %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT " - starting at offset %" G_GUINT64_FORMAT, request->range_start, request->range_end, transfer->read_position); } return http_headers; } static void on_file_ready (GObject * source, GAsyncResult * result, gpointer user_data) { GTask *transfer_task = user_data; DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task); DownloadHelper *dh = transfer->dh; DownloadRequest *request = transfer->request; GError *error = NULL; GFileInputStream *in = g_file_read_finish (transfer->file, result, &error); download_request_lock (request); if (in == NULL) { if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND)) { request->status_code = SOUP_STATUS_NOT_FOUND; } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED)) { request->status_code = SOUP_STATUS_FORBIDDEN; } else { request->status_code = SOUP_STATUS_INTERNAL_SERVER_ERROR; } if (!g_cancellable_is_cancelled (transfer->cancellable)) { GST_LOG ("request errored. Code %d URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT, request->status_code, request->uri, request->range_start, request->range_end); if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED) request->state = DOWNLOAD_REQUEST_STATE_ERROR; finish_transfer_task (dh, transfer_task, error); } else { /* Ignore error from cancelled operation */ g_error_free (error); finish_transfer_task (dh, transfer_task, NULL); } download_request_unlock (request); /* No async callback queued - the transfer is done */ finish_transfer_task (dh, transfer_task, error); return; } /* If the state is cancelled don't override it */ if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED && request->state != DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED) { request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED; request->status_code = SOUP_STATUS_OK; // FIXME: request->headers = handle_response_headers (transfer); GST_TRACE ("request URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT " headers: %" GST_PTR_FORMAT, request->uri, request->range_start, request->range_end, request->headers); if (SOUP_STATUS_IS_SUCCESSFUL (request->status_code) || SOUP_STATUS_IS_REDIRECTION (request->status_code)) { request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED; transfer_task_report_progress (transfer_task); } else { goto finish_transfer_error; } } if (!new_read_buffer (transfer)) goto finish_transfer_error; /* Respect any range request */ if (request->range_start != 0) { if (!g_seekable_seek (G_SEEKABLE (in), request->range_start, G_SEEK_SET, transfer->cancellable, &error)) { goto finish_transfer_error; } transfer->read_position = request->range_start; } download_request_unlock (request); g_main_context_push_thread_default (dh->transfer_context); g_input_stream_read_all_async (G_INPUT_STREAM (in), transfer->read_buffer, transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable, on_read_ready, transfer_task); g_main_context_pop_thread_default (dh->transfer_context); g_object_unref (in); return; finish_transfer_error: request->download_end_time = gst_adaptive_demux_clock_get_time (dh->clock); if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) { GST_LOG ("request complete. Code %d URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT, request->status_code, request->uri, request->range_start, request->range_end); /* If the state is cancelled don't override it */ if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED) request->state = DOWNLOAD_REQUEST_STATE_ERROR; } g_free (transfer->read_buffer); transfer->read_buffer = NULL; download_request_unlock (request); finish_transfer_task (dh, transfer_task, NULL); g_object_unref (in); } static void on_request_sent (GObject * source, GAsyncResult * result, gpointer user_data) { GTask *transfer_task = user_data; DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task); DownloadHelper *dh = transfer->dh; DownloadRequest *request = transfer->request; SoupMessage *msg = transfer->msg; GError *error = NULL; GInputStream *in = _soup_session_send_finish ((SoupSession *) source, result, &error); download_request_lock (request); if (in == NULL) { request->status_code = _soup_message_get_status (msg); if (!g_cancellable_is_cancelled (transfer->cancellable)) { GST_LOG ("request errored. Code %d URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT, request->status_code, request->uri, request->range_start, request->range_end); if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED) request->state = DOWNLOAD_REQUEST_STATE_ERROR; finish_transfer_task (dh, transfer_task, error); } else { /* Ignore error from cancelled operation */ g_error_free (error); finish_transfer_task (dh, transfer_task, NULL); } download_request_unlock (request); /* No async callback queued - the transfer is done */ finish_transfer_task (dh, transfer_task, error); return; } /* If the state is cancelled don't override it */ if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED && request->state != DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED) { request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED; request->status_code = _soup_message_get_status (msg); request->headers = handle_response_headers (transfer); GST_TRACE ("request URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT " headers: %" GST_PTR_FORMAT, request->uri, request->range_start, request->range_end, request->headers); if (SOUP_STATUS_IS_SUCCESSFUL (request->status_code) || SOUP_STATUS_IS_REDIRECTION (request->status_code)) { request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED; transfer_task_report_progress (transfer_task); } else { goto finish_transfer_error; } } if (!new_read_buffer (transfer)) goto finish_transfer_error; download_request_unlock (request); g_main_context_push_thread_default (dh->transfer_context); g_input_stream_read_all_async (in, transfer->read_buffer, transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable, on_read_ready, transfer_task); g_main_context_pop_thread_default (dh->transfer_context); g_object_unref (in); return; finish_transfer_error: request->download_end_time = gst_adaptive_demux_clock_get_time (dh->clock); if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) { GST_LOG ("request complete. Code %d URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT, _soup_message_get_status (msg), request->uri, request->range_start, request->range_end); /* If the state is cancelled don't override it */ if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED) request->state = DOWNLOAD_REQUEST_STATE_ERROR; } g_free (transfer->read_buffer); transfer->read_buffer = NULL; download_request_unlock (request); finish_transfer_task (dh, transfer_task, NULL); g_object_unref (in); } static inline gchar gst_soup_util_log_make_level_tag (SoupLoggerLogLevel level) { gchar c; if (G_UNLIKELY ((gint) level > 9)) return '?'; switch (level) { case SOUP_LOGGER_LOG_MINIMAL: c = 'M'; break; case SOUP_LOGGER_LOG_HEADERS: c = 'H'; break; case SOUP_LOGGER_LOG_BODY: c = 'B'; break; default: /* Unknown level. If this is hit libsoup likely added a new * log level to SoupLoggerLogLevel and it should be added * as a case */ c = level + '0'; break; } return c; } static void gst_soup_util_log_printer_cb (SoupLogger G_GNUC_UNUSED * logger, SoupLoggerLogLevel level, char direction, const char *data, gpointer user_data) { gchar c; c = gst_soup_util_log_make_level_tag (level); GST_TRACE ("HTTP_SESSION(%c): %c %s", c, direction, data); } DownloadHelper * downloadhelper_new (GstAdaptiveDemuxClock * clock) { DownloadHelper *dh = g_new0 (DownloadHelper, 1); dh->transfer_context = g_main_context_new (); dh->loop = g_main_loop_new (dh->transfer_context, FALSE); dh->clock = gst_adaptive_demux_clock_ref (clock); g_mutex_init (&dh->transfer_lock); dh->active_transfers = g_array_new (FALSE, FALSE, sizeof (GTask *)); g_array_set_clear_func (dh->active_transfers, (GDestroyNotify) (release_transfer_task_by_ref)); dh->transfer_requests = g_async_queue_new_full ((GDestroyNotify) g_object_unref); dh->transfer_requests_source = NULL; /* libsoup 3.0 (not 2.74 or 3.1) dispatches using a single source attached * when the session is created, so we need to ensure it matches here. */ g_main_context_push_thread_default (dh->transfer_context); /* Set 10 second timeout. Any longer is likely * an attempt to reuse an already closed connection */ dh->session = _soup_session_new_with_options ("timeout", 10, NULL); /* Setup soup header debugging if we are at GST_LEVEL_TRACE */ if (gst_debug_category_get_threshold (GST_CAT_DEFAULT) >= GST_LEVEL_TRACE) { /* Create a new logger and set body_size_limit to -1 (no limit) */ SoupLogger *logger = _soup_logger_new (SOUP_LOGGER_LOG_HEADERS); _soup_logger_set_printer (logger, gst_soup_util_log_printer_cb, NULL, NULL); /* Attach logger to session */ _soup_session_add_feature (dh->session, (SoupSessionFeature *) logger); g_object_unref (logger); } g_main_context_pop_thread_default (dh->transfer_context); return dh; } void downloadhelper_free (DownloadHelper * dh) { downloadhelper_stop (dh); if (dh->session) g_object_unref (dh->session); g_main_loop_unref (dh->loop); g_main_context_unref (dh->transfer_context); if (dh->clock) gst_adaptive_demux_clock_unref (dh->clock); g_array_free (dh->active_transfers, TRUE); g_async_queue_unref (dh->transfer_requests); g_free (dh->referer); g_free (dh->user_agent); _soup_cookies_free (dh->cookies); g_free (dh); } void downloadhelper_set_referer (DownloadHelper * dh, const gchar * referer) { g_mutex_lock (&dh->transfer_lock); g_free (dh->referer); dh->referer = g_strdup (referer); g_mutex_unlock (&dh->transfer_lock); } void downloadhelper_set_user_agent (DownloadHelper * dh, const gchar * user_agent) { g_mutex_lock (&dh->transfer_lock); g_free (dh->user_agent); dh->user_agent = g_strdup (user_agent); g_mutex_unlock (&dh->transfer_lock); } /* Takes ownership of the strv */ void downloadhelper_set_cookies (DownloadHelper * dh, gchar ** cookies) { guint i; g_mutex_lock (&dh->transfer_lock); _soup_cookies_free (dh->cookies); dh->cookies = NULL; for (i = 0; cookies[i]; i++) { SoupCookie *cookie = _soup_cookie_parse (cookies[i]); if (cookie == NULL) { GST_WARNING ("Couldn't parse cookie, ignoring: %s", cookies[i]); continue; } dh->cookies = g_slist_append (dh->cookies, cookie); } g_mutex_unlock (&dh->transfer_lock); g_strfreev (cookies); } /* Called with the transfer lock held */ static void submit_transfer (DownloadHelper * dh, GTask * transfer_task) { DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task); DownloadRequest *request = transfer->request; download_request_lock (request); if (request->state == DOWNLOAD_REQUEST_STATE_CANCELLED) { download_request_unlock (request); GST_DEBUG ("Don't submit already cancelled transfer"); return; } request->state = DOWNLOAD_REQUEST_STATE_OPEN; request->download_request_time = gst_adaptive_demux_clock_get_time (dh->clock); GST_LOG ("Submitting request URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT, request->uri, request->range_start, request->range_end); transfer_task_report_progress (transfer_task); download_request_unlock (request); if (transfer->is_file_transfer) { g_file_read_async (transfer->file, G_PRIORITY_DEFAULT, transfer->cancellable, on_file_ready, transfer_task); } else { _soup_session_send_async (dh->session, transfer->msg, transfer->cancellable, on_request_sent, transfer_task); } g_array_append_val (dh->active_transfers, transfer_task); } /* Idle callback that submits all pending transfers */ static gboolean submit_transfers_cb (DownloadHelper * dh) { GTask *transfer; g_mutex_lock (&dh->transfer_lock); do { transfer = g_async_queue_try_pop (dh->transfer_requests); if (transfer) { submit_transfer (dh, transfer); } } while (transfer != NULL); /* FIXME: Use a PollFD like GWakeup instead? */ g_source_destroy (dh->transfer_requests_source); g_source_unref (dh->transfer_requests_source); dh->transfer_requests_source = NULL; g_mutex_unlock (&dh->transfer_lock); return G_SOURCE_REMOVE; } static gpointer dh_transfer_thread_func (gpointer data) { DownloadHelper *dh = data; GST_DEBUG ("DownloadHelper thread starting"); g_main_context_push_thread_default (dh->transfer_context); g_main_loop_run (dh->loop); g_main_context_pop_thread_default (dh->transfer_context); GST_DEBUG ("Exiting DownloadHelper thread"); return NULL; } gboolean downloadhelper_start (DownloadHelper * dh) { g_return_val_if_fail (dh->transfer_thread == NULL, FALSE); g_mutex_lock (&dh->transfer_lock); if (!dh->running) { dh->transfer_thread = g_thread_try_new ("adaptive-download-task", dh_transfer_thread_func, dh, NULL); dh->running = (dh->transfer_thread != NULL); } g_mutex_unlock (&dh->transfer_lock); return dh->running; } void downloadhelper_stop (DownloadHelper * dh) { int i; GThread *transfer_thread = NULL; GST_DEBUG ("Stopping DownloadHelper loop"); g_mutex_lock (&dh->transfer_lock); dh->running = FALSE; for (i = 0; i < dh->active_transfers->len; i++) { GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i); DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task); g_cancellable_cancel (transfer->cancellable); } g_main_loop_quit (dh->loop); transfer_thread = dh->transfer_thread; dh->transfer_thread = NULL; g_mutex_unlock (&dh->transfer_lock); if (transfer_thread != NULL) { g_thread_join (transfer_thread); } /* The transfer thread has exited at this point - any remaining transfers are unfinished * and need cleaning up */ g_mutex_lock (&dh->transfer_lock); for (i = 0; i < dh->active_transfers->len; i++) { GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i); DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task); DownloadRequest *request = transfer->request; download_request_lock (request); request->state = DOWNLOAD_REQUEST_STATE_CANCELLED; download_request_unlock (request); transfer->complete = TRUE; if (transfer->blocking) g_cond_broadcast (&transfer->cond); g_task_return_boolean (transfer_task, TRUE); } g_array_set_size (dh->active_transfers, 0); g_mutex_unlock (&dh->transfer_lock); } gboolean downloadhelper_submit_request (DownloadHelper * dh, const gchar * referer, DownloadFlags flags, DownloadRequest * request, GError ** err) { GTask *transfer_task = NULL; gboolean blocking = (flags & DOWNLOAD_FLAG_BLOCKING) != 0; download_request_lock (request); if (request->in_use) { GST_ERROR ("Request for URI %s reusing active request object", request->uri); download_request_unlock (request); return FALSE; } /* Clear the state back to unsent */ request->state = DOWNLOAD_REQUEST_STATE_UNSENT; if (g_str_has_prefix (request->uri, "file://")) { if (flags & DOWNLOAD_FLAG_HEADERS_ONLY) { /* FIXME: Implement fake header only requests for file URIs? */ GST_ERROR ("file:// URIs do not support header-only requests"); download_request_unlock (request); return FALSE; } download_request_unlock (request); /* If resubmitting a request, clear any stale / unused data */ download_request_begin_download (request); transfer_task = transfer_task_new_file (dh, request, blocking); g_mutex_lock (&dh->transfer_lock); } else { const gchar *method = (flags & DOWNLOAD_FLAG_HEADERS_ONLY) ? SOUP_METHOD_HEAD : SOUP_METHOD_GET; SoupMessage *msg = _soup_message_new (method, request->uri); if (msg == NULL) { g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI, "Could not parse download URI %s", request->uri); request->state = DOWNLOAD_REQUEST_STATE_ERROR; download_request_unlock (request); return FALSE; } /* NOTE: There was a bug where Akamai servers return the * wrong result for a range request on small files. To avoid * it if the range starts within the first KB of the file, just * start at 0 instead */ if (request->range_start < 1024) request->range_start = 0; SoupMessageHeaders *msg_headers = _soup_message_get_request_headers (msg); if (request->range_start != 0 || request->range_end != -1) { _soup_message_headers_set_range (msg_headers, request->range_start, request->range_end); } download_request_unlock (request); /* If resubmitting a request, clear any stale / unused data */ download_request_begin_download (request); if ((flags & DOWNLOAD_FLAG_COMPRESS) == 0) { _soup_message_disable_feature (msg, _soup_content_decoder_get_type ()); } if (flags & DOWNLOAD_FLAG_FORCE_REFRESH) { _soup_message_headers_append (msg_headers, "Cache-Control", "max-age=0"); } /* Take the lock to protect header strings */ g_mutex_lock (&dh->transfer_lock); if (referer != NULL) { _soup_message_headers_append (msg_headers, "Referer", referer); } else if (dh->referer != NULL) { _soup_message_headers_append (msg_headers, "Referer", dh->referer); } if (dh->user_agent != NULL) { _soup_message_headers_append (msg_headers, "User-Agent", dh->user_agent); } if (dh->cookies != NULL) { _soup_cookies_to_request (dh->cookies, msg); } transfer_task = transfer_task_new_soup (dh, request, msg, blocking); g_signal_connect (msg, "restarted", G_CALLBACK (soup_msg_restarted_cb), transfer_task); } if (!dh->running) { /* The download helper was deactivated just as we went to dispatch this request. * Abort and manually wake the request, as it never went in the active_transfer list */ g_mutex_unlock (&dh->transfer_lock); download_request_lock (request); request->state = DOWNLOAD_REQUEST_STATE_UNSENT; request->in_use = FALSE; download_request_unlock (request); g_cancellable_cancel (g_task_get_cancellable (transfer_task)); g_task_return_error_if_cancelled (transfer_task); g_object_unref (transfer_task); return FALSE; } download_request_lock (request); request->in_use = TRUE; download_request_unlock (request); /* Now send the request over to the main loop for actual submission */ GST_LOG ("Submitting transfer task %p", transfer_task); g_async_queue_push (dh->transfer_requests, transfer_task); /* No pending idle source to wake the transfer loop - so create one */ if (dh->transfer_requests_source == NULL) { dh->transfer_requests_source = g_idle_source_new (); g_source_set_callback (dh->transfer_requests_source, (GSourceFunc) submit_transfers_cb, dh, NULL); g_source_attach (dh->transfer_requests_source, dh->transfer_context); } if (blocking) { DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task); /* We need an extra ref on the task to make sure it stays alive. * We pushed it in the async queue, but didn't unlock yet, so while * we gave away our ref, the receiver can't have unreffed it */ g_object_ref (transfer_task); while (!transfer->complete) g_cond_wait (&transfer->cond, &dh->transfer_lock); g_object_unref (transfer_task); } g_mutex_unlock (&dh->transfer_lock); return TRUE; } void downloadhelper_cancel_request (DownloadHelper * dh, DownloadRequest * request) { int i; g_mutex_lock (&dh->transfer_lock); download_request_lock (request); if (!request->in_use) goto out; GST_DEBUG ("Cancelling request for URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT, request->uri, request->range_start, request->range_end); request->state = DOWNLOAD_REQUEST_STATE_CANCELLED; for (i = dh->active_transfers->len - 1; i >= 0; i--) { GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i); DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task); if (transfer->request == request) { GST_DEBUG ("Found transfer %p for request for URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT, transfer, request->uri, request->range_start, request->range_end); g_cancellable_cancel (transfer->cancellable); break; } } out: download_request_unlock (request); g_mutex_unlock (&dh->transfer_lock); } DownloadRequest * downloadhelper_fetch_uri_range (DownloadHelper * dh, const gchar * uri, const gchar * referer, DownloadFlags flags, gint64 range_start, gint64 range_end, GError ** err) { DownloadRequest *request; g_return_val_if_fail (uri != NULL, NULL); GST_DEBUG ("Fetching URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT, uri, range_start, range_end); flags |= DOWNLOAD_FLAG_BLOCKING; request = download_request_new_uri_range (uri, range_start, range_end); if (!downloadhelper_submit_request (dh, referer, flags, request, err)) { download_request_unref (request); return NULL; } return request; } DownloadRequest * downloadhelper_fetch_uri (DownloadHelper * dh, const gchar * uri, const gchar * referer, DownloadFlags flags, GError ** err) { return downloadhelper_fetch_uri_range (dh, uri, referer, flags, 0, -1, err); }