gstreamer/subprojects/gst-plugins-good/ext/adaptivedemux2/hls/gsthlsdemux-preloader.c
Jan Schmidt a7e5236a39 hlsdemux2/preloader: Implement basic request handling
Implement fulfilment of HTTP requests from the active preload downloads by
finding any preload request that can provide the requested data and feeding
bytes from the internal DownloadRequest to the caller provided target
DownloadRequest.

Doesn't yet calculate timestamps to make the target request have a sensible
apparent bitrate.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3883>
2023-02-03 16:52:22 +00:00

486 lines
18 KiB
C

/* GStreamer
Copyright (C) 2022 Jan Schmidt <jan@centricular.com>
*
* gsthlsdemux-preloader.c:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "gsthlsdemux-preloader.h"
/* Everything is called from the scheduler thread, including
* download handling callbacks */
GST_DEBUG_CATEGORY_EXTERN (gst_hls_demux2_debug);
#define GST_CAT_DEFAULT gst_hls_demux2_debug
typedef struct _GstHLSDemuxPreloadRequest GstHLSDemuxPreloadRequest;
struct _GstHLSDemuxPreloadRequest
{
GstHLSDemuxPreloader *preloader; /* Parent preloader */
GstM3U8PreloadHint *hint;
DownloadRequest *download_request;
gboolean download_is_finished; /* TRUE if the input download request completed / failed */
guint64 download_cur_offset; /* offset of the next expected received data */
guint64 download_content_length; /* Content length (filled in when response headers arrive */
/* FIXME: Support multiple target requests? I don't think that can happen in practice,
* since we only download one segment at a time, and MAP requests are distinct from PART requests. */
guint64 target_cur_offset; /* offset of the next delivered target data */
DownloadRequest *target_request;
};
static GstHLSDemuxPreloadRequest *
gst_hls_demux_preload_request_new (GstHLSDemuxPreloader * preloader,
GstM3U8PreloadHint * hint)
{
GstHLSDemuxPreloadRequest *req = g_new0 (GstHLSDemuxPreloadRequest, 1);
req->preloader = preloader;
req->hint = gst_m3u8_preload_hint_ref (hint);
return req;
};
static void
gst_hls_demux_preload_request_free (GstHLSDemuxPreloadRequest * req)
{
gst_m3u8_preload_hint_unref (req->hint);
if (req->download_request != NULL) {
/* The download request must have been cancelled by the preload helper */
g_assert (req->download_request->in_use == FALSE);
download_request_unref (req->download_request);
}
if (req->target_request != NULL) {
download_request_unref (req->target_request);
}
g_free (req);
};
static gboolean
gst_hls_demux_preloader_submit (GstHLSDemuxPreloader * preloader,
GstHLSDemuxPreloadRequest * preload_req, const gchar * referrer_uri);
static void gst_hls_demux_preloader_release_request (GstHLSDemuxPreloader *
preloader, GstHLSDemuxPreloadRequest * preload_req,
gboolean cancel_download);
GstHLSDemuxPreloader *
gst_hls_demux_preloader_new (DownloadHelper * download_helper)
{
GstHLSDemuxPreloader *preloader = g_new0 (GstHLSDemuxPreloader, 1);
preloader->download_helper = download_helper;
preloader->active_preloads = g_ptr_array_new ();
return preloader;
}
void
gst_hls_demux_preloader_free (GstHLSDemuxPreloader * preloader)
{
gst_hls_demux_preloader_cancel (preloader, M3U8_PRELOAD_HINT_ALL);
g_ptr_array_free (preloader->active_preloads, TRUE);
g_free (preloader);
}
void
gst_hls_demux_preloader_load (GstHLSDemuxPreloader * preloader,
GstM3U8PreloadHint * hint, const gchar * referrer_uri)
{
/* Check if we have an active preload already for this hint */
guint idx;
for (idx = 0; idx < preloader->active_preloads->len; idx++) {
GstHLSDemuxPreloadRequest *req =
g_ptr_array_index (preloader->active_preloads, idx);
if (hint->hint_type == req->hint->hint_type) {
/* We already have an active hint of this type. If this new one is different, cancel
* the active preload before starting this one */
if (gst_m3u8_preload_hint_equal (hint, req->hint)) {
GST_LOG ("Ignoring pre-existing preload of type %d uri: %s, range:%"
G_GINT64_FORMAT " size %" G_GINT64_FORMAT, hint->hint_type,
hint->uri, hint->offset, hint->size);
return; /* Nothing to do */
}
gst_hls_demux_preloader_release_request (preloader, req, TRUE);
g_ptr_array_remove_index_fast (preloader->active_preloads, idx);
break;
}
}
/* If we get here, then there's no preload of this type. Create one */
GstHLSDemuxPreloadRequest *req =
gst_hls_demux_preload_request_new (preloader, hint);
/* Submit the request */
if (gst_hls_demux_preloader_submit (preloader, req, referrer_uri)) {
g_ptr_array_add (preloader->active_preloads, req);
} else {
/* Discard failed request */
gst_hls_demux_preloader_release_request (preloader, req, TRUE);
}
}
void
gst_hls_demux_preloader_cancel (GstHLSDemuxPreloader * preloader,
GstM3U8PreloadHintType hint_types)
{
/* Go through the active downloads and remove/cancel any with the matching type */
guint idx;
for (idx = 0; idx < preloader->active_preloads->len;) {
GstHLSDemuxPreloadRequest *req =
g_ptr_array_index (preloader->active_preloads, idx);
if (hint_types & req->hint->hint_type) {
gst_hls_demux_preloader_release_request (preloader, req, TRUE);
g_ptr_array_remove_index_fast (preloader->active_preloads, idx);
continue; /* Don't increment idx++, as we just removed an item */
}
idx++;
}
}
/* This function transfers any available data to the target request, and possibly
* completes it and removes it from the preload */
static void
gst_hls_demux_preloader_despatch (GstHLSDemuxPreloadRequest * preload_req,
gboolean input_is_finished)
{
GstHLSDemuxPreloader *preloader = preload_req->preloader;
if (input_is_finished)
preload_req->download_is_finished = TRUE;
else
input_is_finished = preload_req->download_is_finished;
/* If there is a target request, see if any of our data should be
* transferred to it, and if it should be despatched as complete */
if (preload_req->target_request != NULL) {
gboolean output_is_finished = input_is_finished;
gboolean despatch_progress = FALSE;
download_request_lock (preload_req->target_request);
download_request_lock (preload_req->download_request);
DownloadRequestState target_state = preload_req->download_request->state;
/* Transfer the http status code */
preload_req->target_request->status_code =
preload_req->download_request->status_code;
GstBuffer *target_buf =
download_request_take_buffer_range (preload_req->download_request,
preload_req->target_cur_offset,
preload_req->target_request->range_end);
if (target_buf != NULL) {
DownloadRequest *req = preload_req->target_request;
/* Deliver data to the target, and update our tracked output position */
preload_req->target_cur_offset =
GST_BUFFER_OFFSET (target_buf) + gst_buffer_get_size (target_buf);
GST_LOG ("Adding %" G_GSIZE_FORMAT " bytes at offset %" G_GUINT64_FORMAT
" to target download request uri %s range %" G_GINT64_FORMAT " - %"
G_GINT64_FORMAT, gst_buffer_get_size (target_buf),
GST_BUFFER_OFFSET (target_buf), req->uri, req->range_start,
req->range_end);
download_request_add_buffer (req, target_buf);
despatch_progress = TRUE; /* Added a buffer, despatch progress callback */
if (req->range_end != -1
&& preload_req->target_cur_offset > req->range_end) {
/* We've delivered all data to satisfy the requested byte range - the target request is complete */
if (target_state == DOWNLOAD_REQUEST_STATE_LOADING) {
target_state = DOWNLOAD_REQUEST_STATE_COMPLETE;
GST_LOG ("target download request uri %s range %" G_GINT64_FORMAT
" - %" G_GINT64_FORMAT " is fully satisfied. Completing",
req->uri, req->range_start, req->range_end);
}
output_is_finished = TRUE;
}
}
/* Update the target request's state, which may have been adjusted from the
* input request's state */
preload_req->target_request->state = target_state;
/* FIXME: Transfer timing from the input download as best we can, so the receiver can
* calculate bitrates */
/* We're done with the input download request . */
download_request_unlock (preload_req->download_request);
if (output_is_finished) {
DownloadRequest *req = preload_req->target_request;
GST_DEBUG ("Finishing target preload request uri: %s, start: %"
G_GINT64_FORMAT " end: %" G_GINT64_FORMAT, req->uri, req->range_start,
req->range_end);
download_request_despatch_completion (req);
download_request_unlock (req);
download_request_unref (req);
preload_req->target_request = NULL;
} else if (despatch_progress) {
DownloadRequest *req = preload_req->target_request;
download_request_despatch_progress (req);
}
/* Unlock if the target request didn't get released above */
if (preload_req->target_request != NULL) {
download_request_unlock (preload_req->target_request);
}
}
if (input_is_finished) {
if (preload_req->download_request == NULL
|| download_request_get_bytes_available (preload_req->download_request)
== 0) {
GstM3U8PreloadHint *hint = preload_req->hint;
GST_DEBUG ("Removing finished+drained preload type %d uri: %s, start: %"
G_GINT64_FORMAT " size: %" G_GINT64_FORMAT, hint->hint_type,
hint->uri, hint->offset, hint->size);
/* The incoming request is complete and the data is drained. Remove this preload request from the list */
g_ptr_array_remove_fast (preloader->active_preloads, preload_req);
gst_hls_demux_preloader_release_request (preloader, preload_req, FALSE);
}
}
}
static void
on_download_cancellation (DownloadRequest * request, DownloadRequestState state,
GstHLSDemuxPreloadRequest * preload_req)
{
gst_hls_demux_preloader_despatch (preload_req, TRUE);
}
static void
on_download_error (DownloadRequest * request, DownloadRequestState state,
GstHLSDemuxPreloadRequest * preload_req)
{
GstM3U8PreloadHint *hint = preload_req->hint;
GST_DEBUG ("preload type %d uri: %s download error", hint->hint_type,
hint->uri);
/* FIXME: Should we attempt to re-request a preload? Should we check if
* any part was transferred to the target request already? Should we
* attempt to request a byte range with a new start position if we
* already despatched data to other requests?
*/
gst_hls_demux_preloader_despatch (preload_req, TRUE);
}
static void
on_download_progress (DownloadRequest * request, DownloadRequestState state,
GstHLSDemuxPreloadRequest * preload_req)
{
GstM3U8PreloadHint *hint = preload_req->hint;
GST_DEBUG ("preload type %d uri: %s download progress. position %"
G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT " bytes", hint->hint_type,
hint->uri,
preload_req->download_cur_offset +
download_request_get_bytes_available (request), request->content_length);
preload_req->download_content_length = request->content_length;
gst_hls_demux_preloader_despatch (preload_req, FALSE);
}
static void
on_download_complete (DownloadRequest * request, DownloadRequestState state,
GstHLSDemuxPreloadRequest * preload_req)
{
GstM3U8PreloadHint *hint = preload_req->hint;
GST_DEBUG ("preload type %d uri: %s download complete. position %"
G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT " bytes", hint->hint_type,
hint->uri,
preload_req->download_cur_offset +
download_request_get_bytes_available (request), request->content_length);
preload_req->download_content_length = request->content_length;
gst_hls_demux_preloader_despatch (preload_req, TRUE);
}
static gboolean
gst_hls_demux_preloader_submit (GstHLSDemuxPreloader * preloader,
GstHLSDemuxPreloadRequest * preload_req, const gchar * referrer_uri)
{
g_assert (preload_req->download_request == NULL);
DownloadRequest *download_req = download_request_new ();
GstM3U8PreloadHint *hint = preload_req->hint;
/* Configure our download request */
gint64 end = RFC8673_LAST_BYTE_POS;
if (hint->size > 0) {
end = hint->offset + hint->size - 1;
}
download_request_set_uri (download_req, hint->uri, hint->offset, end);
download_request_set_callbacks (download_req,
(DownloadRequestEventCallback) on_download_complete,
(DownloadRequestEventCallback) on_download_error,
(DownloadRequestEventCallback) on_download_cancellation,
(DownloadRequestEventCallback) on_download_progress, preload_req);
GST_DEBUG ("Submitting preload type %d uri: %s, range:%" G_GINT64_FORMAT
" - %" G_GINT64_FORMAT, hint->hint_type, hint->uri, hint->offset, end);
if (!downloadhelper_submit_request (preloader->download_helper,
referrer_uri, DOWNLOAD_FLAG_NONE, download_req, NULL)) {
/* Abandon the request */
download_request_unref (download_req);
return FALSE;
}
/* Store the current read offset */
preload_req->download_cur_offset = hint->offset;
preload_req->download_request = download_req;
preload_req->download_is_finished = FALSE;
return TRUE;
}
static void
gst_hls_demux_preloader_release_request (GstHLSDemuxPreloader * preloader,
GstHLSDemuxPreloadRequest * preload_req, gboolean cancel_download)
{
if (preload_req->download_request) {
if (cancel_download) {
GstM3U8PreloadHint *hint = preload_req->hint;
GST_DEBUG ("Cancelling preload type %d uri: %s, range start:%"
G_GINT64_FORMAT " size %" G_GINT64_FORMAT, hint->hint_type, hint->uri,
hint->offset, hint->size);
downloadhelper_cancel_request (preloader->download_helper,
preload_req->download_request);
}
}
gst_hls_demux_preload_request_free (preload_req);
}
/* See if we can satisfy a download request from a preload, and fulfil it if so.
* There are several cases:
* * The URI and range exactly match one of our preloads -> OK
* * The URI matches, and the requested range is a subset of the preload -> OK
* * The URI matches, but the requested range is outside what's available in the preload
* and can't be provided.
*
* Within those options, there are sub-possibilities:
* * The preload request is ongoing. It might have enough data already to completely provide
* the requested range.
* * The preload request is ongoing, but has already moved past the requested range (no longer available)
* * The preload request is ongoing, will feed data to the target req as it arrives
* * The preload request is complete already, so can either provide the requested range or not, but
* also needs to mark the target_req as completed once it has passed the required data.
*/
gboolean
gst_hls_demux_preloader_provide_request (GstHLSDemuxPreloader * preloader,
DownloadRequest * target_req)
{
guint idx;
for (idx = 0; idx < preloader->active_preloads->len; idx++) {
GstHLSDemuxPreloadRequest *preload_req =
g_ptr_array_index (preloader->active_preloads, idx);
GstM3U8PreloadHint *hint = preload_req->hint;
if (!g_str_equal (hint->uri, target_req->uri))
continue;
GST_LOG ("Possible matching preload type %d uri: %s, range start:%"
G_GINT64_FORMAT " size %" G_GINT64_FORMAT " (download position %"
G_GUINT64_FORMAT ") for req with range %" G_GINT64_FORMAT " to %"
G_GINT64_FORMAT, hint->hint_type, hint->uri, hint->offset, hint->size,
preload_req->download_cur_offset, target_req->range_start,
target_req->range_end);
if (target_req->range_start > preload_req->download_cur_offset) {
/* This preload request is for a byte range beyond the desired
* position (or something already consumed the target data) */
GST_LOG ("Range start didn't match");
continue;
}
if (target_req->range_end != -1) {
/* The target request does not want the entire rest of the preload
* stream, so check that the end is satisfiable */
gint64 content_length = preload_req->download_content_length;
if (content_length == 0) {
/* We don't have information from the preload download's response headers yet,
* so check against the requested length and error out later if the server
* doesn't provide all the desired response */
if (hint->size != -1)
content_length = hint->size;
}
if (content_length != 0) {
/* We have some idea of the content length. Check if it will provide the requested
* range */
if (target_req->range_end > hint->offset + content_length - 1) {
GST_LOG ("Range end %" G_GINT64_FORMAT " is beyond the end (%"
G_GINT64_FORMAT ") of this preload", target_req->range_end,
hint->offset + content_length - 1);
continue;
}
}
}
GST_DEBUG ("Found a matching preload type %d uri: %s, range start:%"
G_GINT64_FORMAT " size %" G_GINT64_FORMAT, hint->hint_type, hint->uri,
hint->offset, hint->size);
if (preload_req->target_request != NULL) {
DownloadRequest *old_request = preload_req->target_request;
/* Detach the existing target request */
if (old_request != target_req) {
download_request_lock (old_request);
old_request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
download_request_despatch_completion (old_request);
download_request_unlock (old_request);
}
download_request_unref (old_request);
preload_req->target_request = NULL;
}
/* Attach the new target request and despatch any available data */
preload_req->target_cur_offset = target_req->range_start;
preload_req->target_request = download_request_ref (target_req);
download_request_lock (target_req);
target_req->state = DOWNLOAD_REQUEST_STATE_UNSENT;
download_request_begin_download (target_req);
download_request_unlock (target_req);
gst_hls_demux_preloader_despatch (preload_req, FALSE);
return TRUE;
}
return FALSE;
}