adaptivedemux2: Implement file:// URI handling

Add the ability to play HLS and DASH from local files

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/6969>
This commit is contained in:
Jan Schmidt 2024-05-30 00:31:38 +03:00 committed by GStreamer Marge Bot
parent 9dc1d68e2f
commit f2a18ab277
2 changed files with 251 additions and 85 deletions

View file

@ -64,7 +64,14 @@ struct DownloadHelperTransfer
GCancellable *cancellable;
SoupMessage *msg;
gboolean is_file_transfer;
union
{
SoupMessage *msg;
GFile *file;
};
gboolean request_sent;
/* Current read buffer */
@ -134,11 +141,10 @@ transfer_report_progress_cb (gpointer task)
return FALSE;
}
static GTask *
transfer_task_new (DownloadHelper * dh, DownloadRequest * request,
SoupMessage * msg, gboolean blocking)
static DownloadHelperTransfer *
transfer_new_common (DownloadHelper * dh, DownloadRequest * request,
gboolean blocking)
{
GTask *transfer_task = NULL;
DownloadHelperTransfer *transfer = g_new0 (DownloadHelperTransfer, 1);
transfer->blocking = blocking;
@ -149,8 +155,13 @@ transfer_task_new (DownloadHelper * dh, DownloadRequest * request,
transfer->request = download_request_ref (request);
transfer->dh = dh;
transfer->msg = msg;
return transfer;
}
static GTask *
transfer_task_new_for_transfer (DownloadHelperTransfer * transfer)
{
GTask *transfer_task = NULL;
transfer_task =
g_task_new (NULL, transfer->cancellable,
(GAsyncReadyCallback) transfer_completion_cb, NULL);
@ -160,6 +171,28 @@ transfer_task_new (DownloadHelper * dh, DownloadRequest * request,
return transfer_task;
}
static GTask *
transfer_task_new_file (DownloadHelper * dh, DownloadRequest * request,
gboolean blocking)
{
DownloadHelperTransfer *transfer =
transfer_new_common (dh, request, blocking);
transfer->is_file_transfer = TRUE;
transfer->file = g_file_new_for_uri (request->uri);
return transfer_task_new_for_transfer (transfer);
}
static GTask *
transfer_task_new_soup (DownloadHelper * dh, DownloadRequest * request,
SoupMessage * msg, gboolean blocking)
{
DownloadHelperTransfer *transfer =
transfer_new_common (dh, request, blocking);
transfer->msg = msg;
return transfer_task_new_for_transfer (transfer);
}
static void
release_transfer_task_by_ref (GTask ** transfer_task)
{
@ -223,19 +256,21 @@ static gboolean
new_read_buffer (DownloadHelperTransfer * transfer)
{
gint buffer_size = CHUNK_BUFFER_SIZE;
#if 0
DownloadRequest *request = transfer->request;
if (request->range_end != -1) {
if (request->range_end <= transfer->read_position) {
transfer->read_buffer = NULL;
transfer->read_buffer_size = 0;
return FALSE;
if (transfer->is_file_transfer) {
/* For file reads, handle range request limits here */
DownloadRequest *request = transfer->request;
if (request->range_end != -1) {
if (request->range_end <= transfer->read_position) {
transfer->read_buffer = NULL;
transfer->read_buffer_size = 0;
return FALSE;
}
if (request->range_end - transfer->read_position < buffer_size)
buffer_size = request->range_end - transfer->read_position + 1;
}
if (request->range_end - transfer->read_position < buffer_size)
buffer_size = request->range_end - transfer->read_position + 1;
}
#endif
transfer->read_buffer = g_new (char, buffer_size);
transfer->read_buffer_size = buffer_size;
@ -348,17 +383,19 @@ on_read_ready (GObject * source, GAsyncResult * result, gpointer user_data)
finish_transfer:
if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) {
SoupStatus status_code = _soup_message_get_status (transfer->msg);
if (!transfer->is_file_transfer) {
request->status_code = _soup_message_get_status (transfer->msg);
}
#ifndef GST_DISABLE_GST_DEBUG
guint download_ms = (now - request->download_request_time) / GST_MSECOND;
GST_LOG ("request complete in %u ms. Code %d URI %s range %" G_GINT64_FORMAT
" %" G_GINT64_FORMAT, download_ms, status_code,
" %" G_GINT64_FORMAT, download_ms, request->status_code,
request->uri, request->range_start, request->range_end);
#endif
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED) {
if (SOUP_STATUS_IS_SUCCESSFUL (status_code)
|| SOUP_STATUS_IS_REDIRECTION (status_code)) {
if (SOUP_STATUS_IS_SUCCESSFUL (request->status_code)
|| SOUP_STATUS_IS_REDIRECTION (request->status_code)) {
request->state = DOWNLOAD_REQUEST_STATE_COMPLETE;
} else {
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
@ -508,6 +545,115 @@ handle_response_headers (DownloadHelperTransfer * transfer)
return http_headers;
}
static void
on_file_ready (GObject * source, GAsyncResult * result, gpointer user_data)
{
GTask *transfer_task = user_data;
DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
DownloadHelper *dh = transfer->dh;
DownloadRequest *request = transfer->request;
GError *error = NULL;
GFileInputStream *in = g_file_read_finish (transfer->file, result, &error);
download_request_lock (request);
if (in == NULL) {
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND)) {
request->status_code = SOUP_STATUS_NOT_FOUND;
} else if (g_error_matches (error, G_IO_ERROR,
G_IO_ERROR_PERMISSION_DENIED)) {
request->status_code = SOUP_STATUS_FORBIDDEN;
} else {
request->status_code = SOUP_STATUS_INTERNAL_SERVER_ERROR;
}
if (!g_cancellable_is_cancelled (transfer->cancellable)) {
GST_LOG ("request errored. Code %d URI %s range %" G_GINT64_FORMAT " %"
G_GINT64_FORMAT, request->status_code, request->uri,
request->range_start, request->range_end);
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED)
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
finish_transfer_task (dh, transfer_task, error);
} else {
/* Ignore error from cancelled operation */
g_error_free (error);
finish_transfer_task (dh, transfer_task, NULL);
}
download_request_unlock (request);
/* No async callback queued - the transfer is done */
finish_transfer_task (dh, transfer_task, error);
return;
}
/* If the state is cancelled don't override it */
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED &&
request->state != DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED) {
request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
request->status_code = SOUP_STATUS_OK;
// FIXME: request->headers = handle_response_headers (transfer);
GST_TRACE ("request URI %s range %" G_GINT64_FORMAT " %"
G_GINT64_FORMAT " headers: %" GST_PTR_FORMAT,
request->uri, request->range_start, request->range_end,
request->headers);
if (SOUP_STATUS_IS_SUCCESSFUL (request->status_code)
|| SOUP_STATUS_IS_REDIRECTION (request->status_code)) {
request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
transfer_task_report_progress (transfer_task);
} else {
goto finish_transfer_error;
}
}
if (!new_read_buffer (transfer))
goto finish_transfer_error;
/* Respect any range request */
if (request->range_start != 0) {
if (!g_seekable_seek (G_SEEKABLE (in), request->range_start, G_SEEK_SET,
transfer->cancellable, &error)) {
goto finish_transfer_error;
}
transfer->read_position = request->range_start;
}
download_request_unlock (request);
g_main_context_push_thread_default (dh->transfer_context);
g_input_stream_read_all_async (G_INPUT_STREAM (in), transfer->read_buffer,
transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable,
on_read_ready, transfer_task);
g_main_context_pop_thread_default (dh->transfer_context);
g_object_unref (in);
return;
finish_transfer_error:
request->download_end_time = gst_adaptive_demux_clock_get_time (dh->clock);
if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) {
GST_LOG ("request complete. Code %d URI %s range %" G_GINT64_FORMAT " %"
G_GINT64_FORMAT, request->status_code, request->uri,
request->range_start, request->range_end);
/* If the state is cancelled don't override it */
if (request->state != DOWNLOAD_REQUEST_STATE_CANCELLED)
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
}
g_free (transfer->read_buffer);
transfer->read_buffer = NULL;
download_request_unlock (request);
finish_transfer_task (dh, transfer_task, NULL);
g_object_unref (in);
}
static void
on_request_sent (GObject * source, GAsyncResult * result, gpointer user_data)
{
@ -776,8 +922,13 @@ submit_transfer (DownloadHelper * dh, GTask * transfer_task)
transfer_task_report_progress (transfer_task);
download_request_unlock (request);
_soup_session_send_async (dh->session, transfer->msg, transfer->cancellable,
on_request_sent, transfer_task);
if (transfer->is_file_transfer) {
g_file_read_async (transfer->file, G_PRIORITY_DEFAULT,
transfer->cancellable, on_file_ready, transfer_task);
} else {
_soup_session_send_async (dh->session, transfer->msg, transfer->cancellable,
on_request_sent, transfer_task);
}
g_array_append_val (dh->active_transfers, transfer_task);
}
@ -896,14 +1047,8 @@ downloadhelper_submit_request (DownloadHelper * dh,
GError ** err)
{
GTask *transfer_task = NULL;
const gchar *method;
SoupMessage *msg;
SoupMessageHeaders *msg_headers;
gboolean blocking = (flags & DOWNLOAD_FLAG_BLOCKING) != 0;
method =
(flags & DOWNLOAD_FLAG_HEADERS_ONLY) ? SOUP_METHOD_HEAD : SOUP_METHOD_GET;
download_request_lock (request);
if (request->in_use) {
GST_ERROR ("Request for URI %s reusing active request object",
@ -915,62 +1060,85 @@ downloadhelper_submit_request (DownloadHelper * dh,
/* Clear the state back to unsent */
request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
msg = _soup_message_new (method, request->uri);
if (msg == NULL) {
g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
"Could not parse download URI %s", request->uri);
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
if (g_str_has_prefix (request->uri, "file://")) {
if (flags & DOWNLOAD_FLAG_HEADERS_ONLY) {
/* FIXME: Implement fake header only requests for file URIs? */
GST_ERROR ("file:// URIs do not support header-only requests");
download_request_unlock (request);
return FALSE;
}
download_request_unlock (request);
return FALSE;
/* If resubmitting a request, clear any stale / unused data */
download_request_begin_download (request);
transfer_task = transfer_task_new_file (dh, request, blocking);
g_mutex_lock (&dh->transfer_lock);
} else {
const gchar *method =
(flags & DOWNLOAD_FLAG_HEADERS_ONLY) ? SOUP_METHOD_HEAD :
SOUP_METHOD_GET;
SoupMessage *msg = _soup_message_new (method, request->uri);
if (msg == NULL) {
g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
"Could not parse download URI %s", request->uri);
request->state = DOWNLOAD_REQUEST_STATE_ERROR;
download_request_unlock (request);
return FALSE;
}
/* NOTE: There was a bug where Akamai servers return the
* wrong result for a range request on small files. To avoid
* it if the range starts within the first KB of the file, just
* start at 0 instead */
if (request->range_start < 1024)
request->range_start = 0;
SoupMessageHeaders *msg_headers = _soup_message_get_request_headers (msg);
if (request->range_start != 0 || request->range_end != -1) {
_soup_message_headers_set_range (msg_headers, request->range_start,
request->range_end);
}
download_request_unlock (request);
/* If resubmitting a request, clear any stale / unused data */
download_request_begin_download (request);
if ((flags & DOWNLOAD_FLAG_COMPRESS) == 0) {
_soup_message_disable_feature (msg, _soup_content_decoder_get_type ());
}
if (flags & DOWNLOAD_FLAG_FORCE_REFRESH) {
_soup_message_headers_append (msg_headers, "Cache-Control", "max-age=0");
}
/* Take the lock to protect header strings */
g_mutex_lock (&dh->transfer_lock);
if (referer != NULL) {
_soup_message_headers_append (msg_headers, "Referer", referer);
} else if (dh->referer != NULL) {
_soup_message_headers_append (msg_headers, "Referer", dh->referer);
}
if (dh->user_agent != NULL) {
_soup_message_headers_append (msg_headers, "User-Agent", dh->user_agent);
}
if (dh->cookies != NULL) {
_soup_cookies_to_request (dh->cookies, msg);
}
transfer_task = transfer_task_new_soup (dh, request, msg, blocking);
g_signal_connect (msg, "restarted", G_CALLBACK (soup_msg_restarted_cb),
transfer_task);
}
/* NOTE: There was a bug where Akamai servers return the
* wrong result for a range request on small files. To avoid
* it if the range starts within the first KB of the file, just
* start at 0 instead */
if (request->range_start < 1024)
request->range_start = 0;
msg_headers = _soup_message_get_request_headers (msg);
if (request->range_start != 0 || request->range_end != -1) {
_soup_message_headers_set_range (msg_headers, request->range_start,
request->range_end);
}
download_request_unlock (request);
/* If resubmitting a request, clear any stale / unused data */
download_request_begin_download (request);
if ((flags & DOWNLOAD_FLAG_COMPRESS) == 0) {
_soup_message_disable_feature (msg, _soup_content_decoder_get_type ());
}
if (flags & DOWNLOAD_FLAG_FORCE_REFRESH) {
_soup_message_headers_append (msg_headers, "Cache-Control", "max-age=0");
}
/* Take the lock to protect header strings */
g_mutex_lock (&dh->transfer_lock);
if (referer != NULL) {
_soup_message_headers_append (msg_headers, "Referer", referer);
} else if (dh->referer != NULL) {
_soup_message_headers_append (msg_headers, "Referer", dh->referer);
}
if (dh->user_agent != NULL) {
_soup_message_headers_append (msg_headers, "User-Agent", dh->user_agent);
}
if (dh->cookies != NULL) {
_soup_cookies_to_request (dh->cookies, msg);
}
transfer_task = transfer_task_new (dh, request, msg, blocking);
if (!dh->running) {
/* The download helper was deactivated just as we went to dispatch this request.
* Abort and manually wake the request, as it never went in the active_transfer list */
@ -992,9 +1160,6 @@ downloadhelper_submit_request (DownloadHelper * dh,
request->in_use = TRUE;
download_request_unlock (request);
g_signal_connect (msg, "restarted", G_CALLBACK (soup_msg_restarted_cb),
transfer_task);
/* Now send the request over to the main loop for actual submission */
GST_LOG ("Submitting transfer task %p", transfer_task);
g_async_queue_push (dh->transfer_requests, transfer_task);

View file

@ -983,10 +983,11 @@ handle_incoming_manifest (GstAdaptiveDemux * demux)
if (!g_str_has_prefix (demux->manifest_uri, "data:")
&& !g_str_has_prefix (demux->manifest_uri, "http://")
&& !g_str_has_prefix (demux->manifest_uri, "https://")) {
&& !g_str_has_prefix (demux->manifest_uri, "https://")
&& !g_str_has_prefix (demux->manifest_uri, "file://")) {
GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
(_("Invalid manifest URI")),
("Manifest URI needs to use either data:, http:// or https://"));
("Manifest URI needs to use either data:, http://, https:// or file://"));
gst_query_unref (query);
ret = FALSE;
goto unlock_out;