adaptivedemux2: Add gst_adaptive_demux2_stream_wait_prepared()

Add a method that waits for a stream to signal the prepare_cond after it returns
a BUSY flow return.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3883>
This commit is contained in:
Jan Schmidt 2022-12-31 23:22:29 +11:00 committed by GStreamer Marge Bot
parent d3acafbb5a
commit 44d3751d68
3 changed files with 79 additions and 4 deletions

View file

@ -91,6 +91,9 @@ gst_adaptive_demux2_stream_init (GstAdaptiveDemux2Stream * stream)
stream->start_position = stream->current_position = GST_CLOCK_TIME_NONE;
g_mutex_init (&stream->prepare_lock);
g_cond_init (&stream->prepare_cond);
gst_segment_init (&stream->parse_segment, GST_FORMAT_TIME);
}
@ -148,6 +151,9 @@ gst_adaptive_demux2_stream_finalize (GObject * object)
gst_clear_tag_list (&stream->pending_tags);
g_clear_pointer (&stream->stream_collection, gst_object_unref);
g_mutex_clear (&stream->prepare_lock);
g_cond_clear (&stream->prepare_cond);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -1787,12 +1793,21 @@ gst_adaptive_demux2_stream_on_can_download_fragments (GstAdaptiveDemux2Stream *
* Called by a subclass that has returned GST_ADAPTIVE_DEMUX_FLOW_BUSY
* from update_fragment_info() to indicate that it is ready to continue
* downloading now.
*
* Called from the scheduler task
*/
void
gst_adaptive_demux2_stream_mark_prepared (GstAdaptiveDemux2Stream * stream)
{
GstAdaptiveDemux *demux = stream->demux;
/* hlsdemux calls this method whenever a playlist is updated, so also
* use it to wake up a stream that's waiting at the live edge */
if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE) {
gst_adaptive_demux2_stream_on_manifest_update (stream);
}
g_cond_broadcast (&stream->prepare_cond);
if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_PREPARE)
return;
@ -1805,6 +1820,22 @@ gst_adaptive_demux2_stream_mark_prepared (GstAdaptiveDemux2Stream * stream)
gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
}
/* Called by external threads (manifest input on sinkpad, and seek handling)
* when it requires the stream to be prepared before they can continue
* Must be held with the SCHEDULER lock held */
gboolean
gst_adaptive_demux2_stream_wait_prepared (GstAdaptiveDemux2Stream * stream)
{
GstAdaptiveDemux *demux = stream->demux;
g_mutex_lock (&stream->prepare_lock);
GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
g_cond_wait (&stream->prepare_cond, &stream->prepare_lock);
g_mutex_unlock (&stream->prepare_lock);
return GST_ADAPTIVE_SCHEDULER_LOCK (demux);
}
static void
gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
stream)
@ -1857,8 +1888,12 @@ gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
GST_DEBUG_OBJECT (stream,
"Fragment info update result: %d %s", ret, gst_flow_get_name (ret));
if (ret == GST_FLOW_OK)
if (ret == GST_FLOW_OK) {
/* Wake anyone that's waiting for this stream to get prepared */
if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_PREPARE)
g_cond_broadcast (&stream->prepare_cond);
stream->starting_fragment = TRUE;
}
break;
case GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING:
break;
@ -1878,7 +1913,12 @@ gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
if (ret == GST_ADAPTIVE_DEMUX_FLOW_BUSY) {
GST_LOG_OBJECT (stream,
"Sub-class returned BUSY flow return. Waiting in PREPARE state");
/* Need to take the prepare lock specifically when switching
* to WAITING_PREPARE state, to avoid a race in _wait_prepared();
*/
g_mutex_lock (&stream->prepare_lock);
stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_PREPARE;
g_mutex_unlock (&stream->prepare_lock);
return FALSE;
}
@ -1941,6 +1981,7 @@ gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
case GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC:
GST_DEBUG_OBJECT (stream, "Lost sync, asking reset to current position");
stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
g_cond_broadcast (&stream->prepare_cond);
gst_adaptive_demux_handle_lost_sync (demux);
return FALSE;
case GST_FLOW_NOT_LINKED:
@ -2140,6 +2181,7 @@ gst_adaptive_demux2_stream_stop_default (GstAdaptiveDemux2Stream * stream)
GST_DEBUG_OBJECT (stream, "Stopping stream (from state %d)", stream->state);
stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
g_cond_broadcast (&stream->prepare_cond);
if (stream->pending_cb_id != 0) {
gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,

View file

@ -309,6 +309,10 @@ struct _GstAdaptiveDemux2Stream
GstAdaptiveDemux2StreamState state;
guint pending_cb_id;
gboolean download_active;
GMutex prepare_lock;
GCond prepare_cond;
/* The (global output) time at which this stream should be woken
* to download more input */
GstClockTimeDiff next_input_wakeup_time;
@ -376,6 +380,7 @@ gboolean gst_adaptive_demux2_stream_handle_collection (GstAdaptiveDemux2Stream *
GstStreamCollection *collection, gboolean *had_pending_tracks);
void gst_adaptive_demux2_stream_mark_prepared(GstAdaptiveDemux2Stream *stream);
gboolean gst_adaptive_demux2_stream_wait_prepared(GstAdaptiveDemux2Stream *stream);
void gst_adaptive_demux2_stream_fragment_clear (GstAdaptiveDemux2StreamFragment * f);

View file

@ -1683,7 +1683,21 @@ gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
/* TODO we only need the first timestamp, maybe create a simple function to
* get the current PTS of a fragment ? */
GST_DEBUG_OBJECT (stream, "Calling update_fragment_info");
gst_adaptive_demux2_stream_update_fragment_info (stream);
GstFlowReturn flow_ret =
gst_adaptive_demux2_stream_update_fragment_info (stream);
/* Handle fragment info waiting on BUSY */
while (flow_ret == GST_ADAPTIVE_DEMUX_FLOW_BUSY) {
if (!gst_adaptive_demux2_stream_wait_prepared (stream))
break;
flow_ret = gst_adaptive_demux2_stream_update_fragment_info (stream);
}
if (flow_ret != GST_FLOW_OK) {
GST_WARNING_OBJECT (stream, "Could not update fragment info. flow: %s",
gst_flow_get_name (flow_ret));
continue;
}
GST_DEBUG_OBJECT (stream,
"Got stream time %" GST_STIME_FORMAT,
@ -2285,8 +2299,22 @@ gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
}
}
if (gst_adaptive_demux2_stream_seek (stream, rate >= 0, stream_seek_flags,
ts, &ts) != GST_FLOW_OK) {
GstFlowReturn flow_ret =
gst_adaptive_demux2_stream_seek (stream, rate >= 0, stream_seek_flags,
ts, &ts);
/* Handle fragment info waiting on BUSY */
while (flow_ret == GST_ADAPTIVE_DEMUX_FLOW_BUSY) {
if (!gst_adaptive_demux2_stream_wait_prepared (stream))
break;
flow_ret = gst_adaptive_demux2_stream_update_fragment_info (stream);
}
if (flow_ret != GST_FLOW_OK) {
GST_DEBUG_OBJECT (demux,
"Seek on stream %" GST_PTR_FORMAT " failed with flow return %s",
stream, gst_flow_get_name (flow_ret));
GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
GST_API_UNLOCK (demux);