adaptivedemux2: Add new flow return value for BUSY and PREPARE stream state

Neither are used yet, they're just placeholders.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3883>
This commit is contained in:
Jan Schmidt 2022-12-09 00:54:47 +11:00 committed by GStreamer Marge Bot
parent b03e68ea8c
commit 82839fb82f
4 changed files with 113 additions and 7 deletions

View file

@ -40,6 +40,7 @@ G_BEGIN_DECLS
/* Internal, so not using GST_FLOW_CUSTOM_SUCCESS_N */ /* Internal, so not using GST_FLOW_CUSTOM_SUCCESS_N */
#define GST_ADAPTIVE_DEMUX_FLOW_SWITCH (GST_FLOW_CUSTOM_SUCCESS_2 + 2) #define GST_ADAPTIVE_DEMUX_FLOW_SWITCH (GST_FLOW_CUSTOM_SUCCESS_2 + 2)
#define GST_ADAPTIVE_DEMUX_FLOW_BUSY (GST_FLOW_CUSTOM_SUCCESS_2 + 3)
#define TRACKS_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->tracks_lock) #define TRACKS_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->tracks_lock)
#define TRACKS_LOCK(d) g_mutex_lock (TRACKS_GET_LOCK (d)) #define TRACKS_LOCK(d) g_mutex_lock (TRACKS_GET_LOCK (d))
@ -100,6 +101,11 @@ struct _GstAdaptiveDemuxPrivate
/* Set to TRUE if any stream is waiting on the manifest update */ /* Set to TRUE if any stream is waiting on the manifest update */
gboolean stream_waiting_for_manifest; gboolean stream_waiting_for_manifest;
/* Set to TRUE if streams can download fragment data. If FALSE,
* they can load playlists / prepare for updata_fragment_info()
*/
gboolean streams_can_download_fragments;
GMutex api_lock; GMutex api_lock;
/* Protects demux and stream segment information /* Protects demux and stream segment information
@ -181,6 +187,7 @@ gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
void gst_adaptive_demux2_stream_on_manifest_update (GstAdaptiveDemux2Stream * stream); void gst_adaptive_demux2_stream_on_manifest_update (GstAdaptiveDemux2Stream * stream);
void gst_adaptive_demux2_stream_on_output_space_available (GstAdaptiveDemux2Stream *stream); void gst_adaptive_demux2_stream_on_output_space_available (GstAdaptiveDemux2Stream *stream);
void gst_adaptive_demux2_stream_on_can_download_fragments(GstAdaptiveDemux2Stream *stream);
gboolean gst_adaptive_demux2_stream_has_next_fragment (GstAdaptiveDemux2Stream * stream); gboolean gst_adaptive_demux2_stream_has_next_fragment (GstAdaptiveDemux2Stream * stream);
GstFlowReturn gst_adaptive_demux2_stream_seek (GstAdaptiveDemux2Stream * stream, GstFlowReturn gst_adaptive_demux2_stream_seek (GstAdaptiveDemux2Stream * stream,

View file

@ -1759,6 +1759,46 @@ gst_adaptive_demux2_stream_on_manifest_update (GstAdaptiveDemux2Stream * stream)
gst_object_ref (stream), (GDestroyNotify) gst_object_unref); gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
} }
void
gst_adaptive_demux2_stream_on_can_download_fragments (GstAdaptiveDemux2Stream *
stream)
{
GstAdaptiveDemux *demux = stream->demux;
if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_BEFORE_DOWNLOAD)
return;
g_assert (stream->pending_cb_id == 0);
GST_LOG_OBJECT (stream, "Scheduling load_a_fragment() call");
stream->pending_cb_id =
gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
(GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
}
/*
* Called by a subclass that has returned GST_ADAPTIVE_DEMUX_FLOW_BUSY
* from update_fragment_info() to indicate that it is ready to continue
* downloading now.
*/
void
gst_adaptive_demux2_stream_mark_prepared (GstAdaptiveDemux2Stream * stream)
{
GstAdaptiveDemux *demux = stream->demux;
if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_PREPARE)
return;
g_assert (stream->pending_cb_id == 0);
GST_LOG_OBJECT (stream, "Scheduling load_a_fragment() call");
stream->pending_cb_id =
gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
(GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
}
static void static void
gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream * gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
stream) stream)
@ -1800,9 +1840,11 @@ gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
switch (stream->state) { switch (stream->state) {
case GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART: case GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART:
case GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT: case GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT:
case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_PREPARE:
case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE: case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE:
case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE: case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE:
case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE: case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE:
case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_BEFORE_DOWNLOAD:
/* Get information about the fragment to download */ /* Get information about the fragment to download */
GST_DEBUG_OBJECT (demux, "Calling update_fragment_info"); GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
ret = gst_adaptive_demux2_stream_update_fragment_info (stream); ret = gst_adaptive_demux2_stream_update_fragment_info (stream);
@ -1827,6 +1869,13 @@ gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
break; break;
} }
if (ret == GST_ADAPTIVE_DEMUX_FLOW_BUSY) {
GST_LOG_OBJECT (stream,
"Sub-class returned BUSY flow return. Waiting in PREPARE state");
stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_PREPARE;
return FALSE;
}
if (ret == GST_FLOW_OK) { if (ret == GST_FLOW_OK) {
/* Wait for room in the output tracks */ /* Wait for room in the output tracks */
if (gst_adaptive_demux2_stream_wait_for_output_space (demux, stream, if (gst_adaptive_demux2_stream_wait_for_output_space (demux, stream,
@ -1857,10 +1906,12 @@ gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
return FALSE; return FALSE;
} }
} }
}
if (gst_adaptive_demux2_stream_download_fragment (stream) != GST_FLOW_OK) { if (ret == GST_FLOW_OK) {
GST_ERROR_OBJECT (demux, if (!demux->priv->streams_can_download_fragments) {
"Failed to begin fragment download for stream %p", stream); GST_LOG_OBJECT (stream, "Waiting for fragment downloads to be unblocked");
stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_BEFORE_DOWNLOAD;
return FALSE; return FALSE;
} }
} }
@ -1869,7 +1920,13 @@ gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
* GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC is not in the GstFlowReturn enum */ * GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC is not in the GstFlowReturn enum */
switch ((int) ret) { switch ((int) ret) {
case GST_FLOW_OK: case GST_FLOW_OK:
break; /* all is good, let's go */ /* all is good, let's go */
if (gst_adaptive_demux2_stream_download_fragment (stream) != GST_FLOW_OK) {
GST_ERROR_OBJECT (demux,
"Failed to begin fragment download for stream %p", stream);
return FALSE;
}
break;
case GST_FLOW_EOS: case GST_FLOW_EOS:
GST_DEBUG_OBJECT (stream, "EOS, checking to stop download loop"); GST_DEBUG_OBJECT (stream, "EOS, checking to stop download loop");
stream->last_ret = ret; stream->last_ret = ret;

View file

@ -82,9 +82,11 @@ enum _GstAdaptiveDemux2StreamState {
GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED, /* Stream was stopped */ GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED, /* Stream was stopped */
GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART, /* Stream stopped but needs restart logic */ GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART, /* Stream stopped but needs restart logic */
GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT, GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT,
GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_PREPARE, /* Sub-class is busy and can't update_fragment_info() yet */
GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE, GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE,
GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE, GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE,
GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE, GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE,
GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_BEFORE_DOWNLOAD, /* Ready, but not allowed to download yet */
GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING, GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING,
GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS, GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS,
GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED
@ -101,8 +103,9 @@ struct _GstAdaptiveDemux2StreamClass
* Requests the stream to set the information about the current fragment to its * Requests the stream to set the information about the current fragment to its
* current fragment struct * current fragment struct
* *
* Returns: #GST_FLOW_OK in success, #GST_FLOW_ERROR on error and #GST_FLOW_EOS * Returns: #GST_FLOW_OK in success, #GST_FLOW_ERROR on error, #GST_FLOW_EOS
* if there is no fragment. * if there is no fragment, or the custom GST_ADAPTIVE_DEMUX_FLOW_BUSY
* if the sub-class is still preparing.
*/ */
GstFlowReturn (*update_fragment_info) (GstAdaptiveDemux2Stream * stream); GstFlowReturn (*update_fragment_info) (GstAdaptiveDemux2Stream * stream);
@ -360,6 +363,8 @@ GstFlowReturn gst_adaptive_demux2_stream_advance_fragment (GstAdaptiveDemux2Stre
gboolean gst_adaptive_demux2_stream_handle_collection (GstAdaptiveDemux2Stream *stream, gboolean gst_adaptive_demux2_stream_handle_collection (GstAdaptiveDemux2Stream *stream,
GstStreamCollection *collection, gboolean *had_pending_tracks); GstStreamCollection *collection, gboolean *had_pending_tracks);
void gst_adaptive_demux2_stream_mark_prepared(GstAdaptiveDemux2Stream *stream);
void gst_adaptive_demux2_stream_fragment_clear (GstAdaptiveDemux2StreamFragment * f); void gst_adaptive_demux2_stream_fragment_clear (GstAdaptiveDemux2StreamFragment * f);
G_END_DECLS G_END_DECLS

View file

@ -804,6 +804,41 @@ gst_adaptive_demux_output_slot_new (GstAdaptiveDemux * demux,
return slot; return slot;
} }
static gboolean
gst_adaptive_demux_scheduler_unblock_fragment_downloads_cb (GstAdaptiveDemux *
demux)
{
GList *iter;
GST_INFO_OBJECT (demux, "Unblocking streams' fragment downloads");
demux->priv->streams_can_download_fragments = TRUE;
iter = demux->input_period->streams;
for (; iter; iter = g_list_next (iter)) {
GstAdaptiveDemux2Stream *stream = iter->data;
gst_adaptive_demux2_stream_on_can_download_fragments (stream);
}
return FALSE;
}
/* must be called with the scheduler lock */
static void
gst_adaptive_demux_set_streams_can_download_fragments (GstAdaptiveDemux * demux,
gboolean streams_can_download_fragments)
{
if (streams_can_download_fragments) {
gst_adaptive_demux_loop_call (demux->priv->scheduler_task, (GSourceFunc)
gst_adaptive_demux_scheduler_unblock_fragment_downloads_cb, demux,
NULL);
} else {
demux->priv->streams_can_download_fragments =
streams_can_download_fragments;
}
}
/* Called: /* Called:
* * After `process_manifest` or when a period starts * * After `process_manifest` or when a period starts
* * Or when all tracks have been created * * Or when all tracks have been created
@ -1020,6 +1055,8 @@ handle_incoming_manifest (GstAdaptiveDemux * demux)
gst_adaptive_demux_prepare_streams (demux, gst_adaptive_demux_prepare_streams (demux,
gst_adaptive_demux_is_live (demux)); gst_adaptive_demux_is_live (demux));
gst_adaptive_demux_set_streams_can_download_fragments (demux, TRUE);
gst_adaptive_demux_start_tasks (demux); gst_adaptive_demux_start_tasks (demux);
gst_adaptive_demux_start_manifest_update_task (demux); gst_adaptive_demux_start_manifest_update_task (demux);
@ -3711,7 +3748,7 @@ gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
return ret; return ret;
} }
/* must be called with manifest_lock taken */ /* must be called from the scheduler task */
void void
gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux) gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
{ {